diff --git a/db.go b/db.go index 50ec011..adfd03e 100644 --- a/db.go +++ b/db.go @@ -33,6 +33,8 @@ type DB struct { rtx *sql.Tx // long running read transaction pageSize int // page size, in bytes + notify chan struct{} // closes on WAL change + ctx context.Context cancel func() wg sync.WaitGroup @@ -61,7 +63,9 @@ type DB struct { // NewDB returns a new instance of DB for a given path. func NewDB(path string) *DB { db := &DB{ - path: path, + path: path, + notify: make(chan struct{}), + MinCheckpointPageN: DefaultMinCheckpointPageN, MonitorInterval: DefaultMonitorInterval, } @@ -128,7 +132,23 @@ func (db *DB) CurrentShadowWALPath(generation string) (string, error) { return filepath.Join(dir, max), nil } +// Notify returns a channel that closes when the shadow WAL changes. +func (db *DB) Notify() <-chan struct{} { + db.mu.RLock() + defer db.mu.RUnlock() + return db.notify +} + +// PageSize returns the page size of the underlying database. +// Only valid after database exists & Init() has successfully run. +func (db *DB) PageSize() int { + db.mu.RLock() + defer db.mu.RUnlock() + return db.pageSize +} + func (db *DB) Open() (err error) { + // Start monitoring SQLite database in a separate goroutine. db.wg.Add(1) go func() { defer db.wg.Done(); db.monitor() }() @@ -215,6 +235,11 @@ func (db *DB) Init() (err error) { return fmt.Errorf("clean: %w", err) } + // Start replication. + for _, r := range db.Replicators { + r.Start(db.ctx) + } + return nil } @@ -253,6 +278,11 @@ func (db *DB) SoftClose() (err error) { db.cancel() db.wg.Wait() + // Ensure replicators all stop replicating. + for _, r := range db.Replicators { + r.Stop() + } + if db.rtx != nil { if e := db.releaseReadLock(); e != nil && err == nil { err = e @@ -344,13 +374,6 @@ func (db *DB) createGeneration() (string, error) { return "", fmt.Errorf("rename generation file: %w", err) } - // Issue snapshot by each replicator. - for _, r := range db.Replicators { - if err := r.BeginSnapshot(db.ctx); err != nil { - return "", fmt.Errorf("cannot snapshot %q replicator: %s", r.Name(), err) - } - } - // Remove old generations. if err := db.clean(); err != nil { return "", err @@ -361,8 +384,8 @@ func (db *DB) createGeneration() (string, error) { // Sync copies pending data from the WAL to the shadow WAL. func (db *DB) Sync() (err error) { - db.mu.RLock() - defer db.mu.RUnlock() + db.mu.Lock() + defer db.mu.Unlock() // No database exists, exit. if db.db == nil { @@ -400,12 +423,16 @@ func (db *DB) Sync() (err error) { // Verify our last sync matches the current state of the WAL. // This ensures that we have an existing generation & that the last sync // position of the real WAL hasn't been overwritten by another process. - // - // If we are unable to verify the WAL state then we start a new generation. info, err := db.verify() if err != nil { return fmt.Errorf("cannot verify wal state: %w", err) - } else if info.reason != "" { + } + + // Track if anything in the shadow WAL changes and then notify at the end. + changed := info.walSize != info.shadowWALSize || info.restart || info.reason != "" + + // If we are unable to verify the WAL state then we start a new generation. + if info.reason != "" { // Start new generation & notify user via log message. if info.generation, err = db.createGeneration(); err != nil { return fmt.Errorf("create generation: %w", err) @@ -417,6 +444,7 @@ func (db *DB) Sync() (err error) { info.shadowWALSize = WALHeaderSize info.restart = false info.reason = "" + } // Synchronize real WAL with current shadow WAL. @@ -441,11 +469,19 @@ func (db *DB) Sync() (err error) { // Issue the checkpoint. if checkpoint { + changed = true + if err := db.checkpoint(info, forceCheckpoint); err != nil { return fmt.Errorf("checkpoint: force=%v err=%w", err) } } + // Notify replicators of WAL changes. + if changed { + close(db.notify) + db.notify = make(chan struct{}) + } + return nil } @@ -496,15 +532,12 @@ func (db *DB) verify() (info syncInfo, err error) { if err != nil { return info, err } - info.shadowWALSize = fi.Size() + info.shadowWALSize = frameAlign(fi.Size(), db.pageSize) // Truncate shadow WAL if there is a partial page. // Exit if shadow WAL does not contain a full header. - frameSize := int64(db.pageSize) + WALFrameHeaderSize if info.shadowWALSize < WALHeaderSize { return info, fmt.Errorf("short shadow wal: %s", info.shadowWALPath) - } else if (info.shadowWALSize-WALHeaderSize)%frameSize != 0 { - info.shadowWALSize = ((info.shadowWALSize - WALHeaderSize) / frameSize) + WALHeaderSize } // If shadow WAL is larger than real WAL then the WAL has been truncated @@ -523,7 +556,12 @@ func (db *DB) verify() (info syncInfo, err error) { info.restart = !bytes.Equal(hdr0, hdr1) } - // TODO: Handle checkpoint sequence number rollover. + // If we only have a header then ensure header matches. + // Otherwise we need to start a new generation. + if info.shadowWALSize == WALHeaderSize && info.restart { + info.reason = "wal header only, mismatched" + return info, nil + } // Verify last page synced still matches. if info.shadowWALSize > WALHeaderSize { @@ -699,6 +737,114 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { return newSize, nil } +// WALReader opens a reader for a shadow WAL file at a given position. +// If the reader is at the end of the file, it attempts to return the next file. +// +// The caller should check Pos() & Size() on the returned reader to check offset. +func (db *DB) WALReader(pos Pos) (r *WALReader, err error) { + // Fetch reader for the requested position. Return if it has data. + r, err = db.walReader(pos) + if err != nil { + return nil, err + } else if r.N() > 0 { + return r, nil + } + + // Otherwise attempt to read the start of the next WAL file. + pos.Index, pos.Offset = pos.Index+1, 0 + + r, err = db.walReader(pos) + if os.IsNotExist(err) { + return nil, io.EOF + } + return r, err +} + +// walReader opens a file reader for a shadow WAL file at a given position. +func (db *DB) walReader(pos Pos) (r *WALReader, err error) { + filename := db.ShadowWALPath(pos.Generation, pos.Index) + + f, err := os.Open(filename) + if err != nil { + return nil, err + } + + // Ensure file is closed if any error occurs. + defer func() { + if err != nil { + r.Close() + } + }() + + // Fetch frame-aligned file size and ensure requested offset is not past EOF. + fi, err := f.Stat() + if err != nil { + return nil, err + } + + fileSize := frameAlign(fi.Size(), db.pageSize) + if pos.Offset > fileSize { + return nil, fmt.Errorf("wal reader offset too high: %d > %d", pos.Offset, fi.Size()) + } + + // Move file handle to offset position. + if _, err := f.Seek(pos.Offset, io.SeekStart); err != nil { + return nil, err + } + + return &WALReader{ + f: f, + n: fileSize - pos.Offset, + pos: pos, + }, nil +} + +// frameAlign returns a frame-aligned offset. +// Returns zero if offset is less than the WAL header size. +func frameAlign(offset int64, pageSize int) int64 { + assert(offset >= 0, "frameAlign(): offset must be non-negative") + assert(pageSize >= 0, "frameAlign(): page size must be non-negative") + + if offset < WALHeaderSize { + return 0 + } + + frameSize := WALFrameHeaderSize + int64(pageSize) + frameN := (offset - WALHeaderSize) / frameSize + return (frameN * frameSize) + WALHeaderSize +} + +// WALReader represents a reader for a WAL file that tracks WAL position. +type WALReader struct { + f *os.File + n int64 + pos Pos +} + +// Close closes the underlying WAL file handle. +func (r *WALReader) Close() error { return r.f.Close() } + +// N returns the remaining bytes in the reader. +func (r *WALReader) N() int64 { return r.n } + +// Pos returns the current WAL position. +func (r *WALReader) Pos() Pos { return r.pos } + +// Read reads bytes into p, updates the position, and returns the bytes read. +// Returns io.EOF at the end of the available section of the WAL. +func (r *WALReader) Read(p []byte) (n int, err error) { + if r.n <= 0 { + return 0, io.EOF + } + if int64(len(p)) > r.n { + p = p[0:r.n] + } + n, err = r.f.Read(p) + r.n -= int64(n) + r.pos.Offset += int64(n) + return n, err +} + const WALHeaderChecksumOffset = 24 const WALFrameHeaderChecksumOffset = 16 @@ -770,6 +916,11 @@ func (db *DB) checkpoint(info syncInfo, force bool) error { return nil } + // Copy the end of the previous WAL before starting a new shadow WAL. + if _, err := db.copyToShadowWAL(info.shadowWALPath); err != nil { + return fmt.Errorf("cannot copy to end of shadow wal: %w", err) + } + // Parse index of current shadow WAL file. dir, base := filepath.Split(info.shadowWALPath) index, err := ParseWALFilename(base) diff --git a/litestream.go b/litestream.go index 0272c8d..0350c75 100644 --- a/litestream.go +++ b/litestream.go @@ -22,6 +22,26 @@ const ( GenerationNameLen = 16 ) +// Pos is a position in the WAL for a generation. +type Pos struct { + Generation string // generation name + Index int // wal file index + Offset int64 // offset within wal file +} + +// String returns a string representation. +func (p Pos) String() string { + if p.IsZero() { + return "<>" + } + return fmt.Sprintf("<%s,%d,%d>", p.Generation, p.Index, p.Offset) +} + +// IsZero returns true if p is the zero value. +func (p Pos) IsZero() bool { + return p == (Pos{}) +} + // Checksum computes a running SQLite checksum over a byte slice. func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32) { assert(len(b)%8 == 0, "misaligned checksum byte slice") diff --git a/replicator.go b/replicator.go index 2656557..e3f4baa 100644 --- a/replicator.go +++ b/replicator.go @@ -2,6 +2,14 @@ package litestream import ( "context" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "strings" + "sync" ) // Replicator represents a method for replicating the snapshot & WAL data to @@ -9,7 +17,9 @@ import ( type Replicator interface { Name() string Type() string - BeginSnapshot(ctx context.Context) error + Snapshotting() bool + Start(ctx context.Context) + Stop() } var _ Replicator = (*FileReplicator)(nil) @@ -19,14 +29,22 @@ type FileReplicator struct { db *DB // source database name string // replicator name, optional dst string // destination path + + mu sync.RWMutex + wg sync.WaitGroup + snapshotting bool // if true, currently copying database + + ctx context.Context + cancel func() } // NewFileReplicator returns a new instance of FileReplicator. func NewFileReplicator(db *DB, name, dst string) *FileReplicator { return &FileReplicator{ - db: db, - name: name, - dst: dst, + db: db, + name: name, + dst: dst, + cancel: func() {}, } } @@ -43,10 +61,211 @@ func (r *FileReplicator) Type() string { return "file" } -// -func (r *FileReplicator) BeginSnapshot(ctx context.Context) error { - // TODO: Set snapshotting state to true. - // TODO: Read current generation. - // TODO: Copy database to destination. +// SnapshotPath returns the path to a snapshot file. +func (r *FileReplicator) SnapshotPath(generation string, index int) string { + return filepath.Join(r.dst, "generations", generation, "snapshots", fmt.Sprintf("%016x.snapshot", index)) +} + +// WALPath returns the path to a WAL file. +func (r *FileReplicator) WALPath(pos Pos) string { + return filepath.Join(r.dst, "generations", pos.Generation, "wal", fmt.Sprintf("%016x.wal", pos.Index)) +} + +// Snapshotting returns true if replicator is current snapshotting. +func (r *FileReplicator) Snapshotting() bool { + r.mu.RLock() + defer r.mu.RLock() + return r.snapshotting +} + +// Start starts replication for a given generation. +func (r *FileReplicator) Start(ctx context.Context) { + // Stop previous replication. + r.Stop() + + r.mu.Lock() + defer r.mu.Unlock() + + // Set snapshotting state. + r.snapshotting = true + + // Wrap context with cancelation. + ctx, r.cancel = context.WithCancel(ctx) + + // Start goroutine to replicate data. + r.wg.Add(1) + go func() { defer r.wg.Done(); r.monitor(ctx) }() +} + +// Stop cancels any outstanding replication and blocks until finished. +func (r *FileReplicator) Stop() { + r.cancel() + r.wg.Wait() +} + +// monitor runs in a separate goroutine and continuously replicates the DB. +func (r *FileReplicator) monitor(ctx context.Context) { + // Continuously check for new data to replicate. + ch := make(chan struct{}) + close(ch) + var notify <-chan struct{} = ch + + var pos Pos + var err error + for { + select { + case <-ctx.Done(): + return + case <-notify: + } + + // Fetch new notify channel before replicating data. + notify = r.db.Notify() + + // Determine position, if necessary. + if pos.IsZero() { + if pos, err = r.pos(); err != nil { + log.Printf("%s(%s): cannot determine position: %w", r.db.Path(), r.Name(), err) + continue + } else if pos.IsZero() { + log.Printf("%s(%s): no generation, waiting for data", r.db.Path(), r.Name()) + continue + } + } + + // Synchronize the shadow wal into the replication directory. + if pos, err = r.sync(ctx, pos); err != nil { + log.Printf("%s(%s): sync error: %w", r.db.Path(), r.Name(), err) + continue + } + } +} + +// pos returns the position for the replicator for the current generation. +// Returns a zero value if there is no active generation. +func (r *FileReplicator) pos() (pos Pos, err error) { + // Find the current generation from the DB. Return zero pos if no generation. + generation, err := r.db.CurrentGeneration() + if err != nil { + return pos, err + } else if generation == "" { + return pos, nil // empty position + } + pos.Generation = generation + + // Find the max WAL file. + walDir := filepath.Join(r.dst, "generations", generation, "wal") + fis, err := ioutil.ReadDir(walDir) + if os.IsNotExist(err) { + return pos, nil // no replicated wal, start at beginning of generation + } else if err != nil { + return pos, err + } + + index := -1 + for _, fi := range fis { + if !strings.HasSuffix(fi.Name(), ".wal") { + continue + } + + if v, err := ParseWALFilename(filepath.Base(fi.Name())); err != nil { + continue // invalid wal filename + } else if index == -1 || v > index { + index = v + } + } + if index == -1 { + return pos, nil // wal directory exists but no wal files, return beginning pos + } + pos.Index = index + + // Determine current offset. + fi, err := os.Stat(filepath.Join(walDir, FormatWALFilename(pos.Index))) + if err != nil { + return pos, err + } + pos.Offset = fi.Size() + + return pos, nil +} + +// snapshot copies the entire database to the replica path. +func (r *FileReplicator) snapshot(ctx context.Context, pos Pos) error { + rd, err := os.Open(r.db.Path()) + if err != nil { + return err + } + defer rd.Close() + + snapshotPath := r.SnapshotPath(pos.Generation, pos.Index) + if err := os.MkdirAll(snapshotPath, 0700); err != nil { + return err + } + + w, err := os.Create(snapshotPath) + if err != nil { + return err + } + defer w.Close() + + if _, err := io.Copy(w, rd); err != nil { + return err + } else if err := w.Sync(); err != nil { + return err + } else if err := w.Close(); err != nil { + return err + } + + r.mu.Lock() + r.snapshotting = false + r.mu.Unlock() + return nil } + +func (r *FileReplicator) sync(ctx context.Context, pos Pos) (_ Pos, err error) { + for { + if pos, err = r.syncNext(ctx, pos); err == io.EOF { + return pos, nil + } else if err != nil { + return pos, err + } + } +} + +func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) { + rd, err := r.db.WALReader(pos) + if err == io.EOF { + return pos, err + } else if err != nil { + return pos, fmt.Errorf("wal reader: %w", err) + } + defer rd.Close() + + // Ensure parent directory exists for WAL file. + filename := r.WALPath(rd.Pos()) + if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil { + return pos, err + } + + // Create a temporary file to write into so we don't have partial writes. + w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return pos, err + } + defer w.Close() + + // Seek, copy & sync WAL contents. + if _, err := w.Seek(rd.Pos().Offset, io.SeekStart); err != nil { + return pos, err + } else if _, err := io.Copy(w, rd); err != nil { + return pos, err + } else if err := w.Sync(); err != nil { + return pos, err + } else if err := w.Close(); err != nil { + return pos, err + } + + // Return ending position of the reader. + return rd.Pos(), nil +}