diff --git a/db.go b/db.go index 5ebacc2..791d80d 100644 --- a/db.go +++ b/db.go @@ -34,7 +34,7 @@ const ( // MonitorDelayInterval is the time Litestream will wait after receiving a file // change notification before processing the WAL file for changes. -const MonitorDelayInterval = 100 * time.Millisecond +const MonitorDelayInterval = 10 * time.Millisecond // MaxIndex is the maximum possible WAL index. // If this index is reached then a new generation will be started. @@ -422,14 +422,20 @@ func (db *DB) Close() (err error) { } } - // Ensure replicas perform a final sync and stop replicating. + // Ensure replicas stop replicating and perform a final sync. for _, r := range db.Replicas { + // Stop normal background sync. + r.Stop() + + // Force one final sync if DB is open. if db.db != nil { if e := r.Sync(ctx); e != nil && err == nil { err = e } } - if e := r.Stop(true); e != nil && err == nil { + + // Close out replica. + if e := r.Close(); e != nil && err == nil { err = e } } @@ -795,10 +801,25 @@ func (db *DB) createGeneration(ctx context.Context) (string, error) { } // Sync copies pending data from the WAL to the shadow WAL. -func (db *DB) Sync(ctx context.Context) (err error) { - db.mu.Lock() - defer db.mu.Unlock() +func (db *DB) Sync(ctx context.Context) error { + const retryN = 5 + for i := 0; i < retryN; i++ { + if err := func() error { + db.mu.Lock() + defer db.mu.Unlock() + return db.sync(ctx) + }(); err != nil { + db.Logger.Printf("sync error, retrying: %s", err) + } else { + break + } + } + return nil + +} + +func (db *DB) sync(ctx context.Context) (err error) { // Initialize database, if necessary. Exit if no DB exists. if err := db.init(); err != nil { return err @@ -889,7 +910,16 @@ func (db *DB) Sync(ctx context.Context) (err error) { // Issue the checkpoint. if checkpoint { - if err := db.checkpoint(ctx, info.generation, checkpointMode); err != nil { + // Under rare circumstances, a checkpoint can be unable to verify continuity + // and will require a restart. + if err := db.checkpoint(ctx, info.generation, checkpointMode); errors.Is(err, errRestartGeneration) { + generation, err := db.createGeneration(ctx) + if err != nil { + return fmt.Errorf("create generation: %w", err) + } + db.Logger.Printf("sync: new generation %q, possible WAL overrun occurred", generation) + + } else if err != nil { return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err) } } @@ -1174,6 +1204,37 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error { return nil } +// verifyLastShadowFrame re-reads the last frame read during the shadow copy. +// This ensures that the frame has not been overrun after a checkpoint occurs +// but before the new write lock has been obtained to initialize the new wal index. +func (db *DB) verifyLastShadowFrame(ctx context.Context) error { + // Skip if we don't have a previous frame to verify. + if db.frame == nil { + return nil + } + + r, err := os.Open(db.WALPath()) + if err != nil { + return err + } + defer r.Close() + + // Seek to position of where the last frame was read. + buf := make([]byte, len(db.frame)) + if _, err := r.Seek(db.pos.Offset-int64(len(db.frame)), io.SeekStart); err != nil { + return fmt.Errorf("seek to last frame: %w", err) + } else if _, err := io.ReadFull(r, buf); err != nil { + return fmt.Errorf("read last frame: %w", err) + } + + // Return a marker error if frames do not match. + if !bytes.Equal(db.frame, buf) { + return errRestartGeneration + } + + return nil +} + // WALSegmentReader returns a reader for a section of WAL data at the given position. // Returns os.ErrNotExist if no matching index/offset is found. func (db *DB) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error) { @@ -1304,6 +1365,16 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { return fmt.Errorf("_litestream_lock: %w", err) } + // Verify we can re-read the last frame copied to the shadow WAL. + // This ensures that another transaction has not overrun the WAL past where + // our previous copy was which would overwrite any additional unread + // frames between the checkpoint & the new write lock. + // + // This only occurs with high load and a short sync frequency so it is rare. + if err := db.verifyLastShadowFrame(ctx); err != nil { + return fmt.Errorf("cannot verify last frame copied from shadow wal: %w", err) + } + // Copy the end of the previous WAL before starting a new shadow WAL. if err := db.copyToShadowWAL(ctx); err != nil { return fmt.Errorf("cannot copy to end of shadow wal: %w", err) @@ -1360,6 +1431,11 @@ func (db *DB) execCheckpoint(mode string) (err error) { } db.Logger.Printf("checkpoint(%s): [%d,%d,%d]", mode, row[0], row[1], row[2]) + // Clear last read frame if we are truncating. + if mode == CheckpointModeTruncate { + db.frame = nil + } + // Reacquire the read lock immediately after the checkpoint. if err := db.acquireReadLock(); err != nil { return fmt.Errorf("reacquire read lock: %w", err) @@ -1543,3 +1619,7 @@ func logPrefixPath(path string) string { } return path } + +// A marker error to indicate that a restart checkpoint could not verify +// continuity between WAL indices and a new generation should be started. +var errRestartGeneration = errors.New("restart generation") diff --git a/litestream.go b/litestream.go index 6cc3f16..5614457 100644 --- a/litestream.go +++ b/litestream.go @@ -302,7 +302,7 @@ func (p Pos) String() string { if p.IsZero() { return "" } - return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset) + return fmt.Sprintf("%s/%08x:%08x", p.Generation, p.Index, p.Offset) } // IsZero returns true if p is the zero value. diff --git a/replica.go b/replica.go index b92b4b4..cc5652d 100644 --- a/replica.go +++ b/replica.go @@ -110,7 +110,7 @@ func (r *Replica) Start(ctx context.Context) { } // Stop previous replication. - _ = r.Stop(false) + r.Stop() // Wrap context with cancelation. ctx, r.cancel = context.WithCancel(ctx) @@ -123,17 +123,17 @@ func (r *Replica) Start(ctx context.Context) { } // Stop cancels any outstanding replication and blocks until finished. -// -// Performing a hard stop will close the DB file descriptor which could release -// locks on per-process locks. Hard stops should only be performed when -// stopping the entire process. -func (r *Replica) Stop(hard bool) (err error) { +func (r *Replica) Stop() { r.cancel() r.wg.Wait() +} +// Close will close the DB file descriptor which could release locks on +// per-process locks (e.g. non-Linux OSes). +func (r *Replica) Close() (err error) { r.muf.Lock() defer r.muf.Unlock() - if hard && r.f != nil { + if r.f != nil { if e := r.f.Close(); e != nil && err == nil { err = e } @@ -297,9 +297,9 @@ func (r *Replica) writeIndexSegments(ctx context.Context, segments []WALSegmentI // Flush LZ4 writer, close pipe, and wait for write to finish. if err := zw.Close(); err != nil { - return err + return fmt.Errorf("lz4 writer close: %w", err) } else if err := pw.Close(); err != nil { - return err + return fmt.Errorf("pipe writer close: %w", err) } else if err := g.Wait(); err != nil { return err }