Add WAL overrun validation

Under high write load, it is possible for write transactions from
another process to overrun the WAL between the time when Litestream
performs a RESTART checkpoint and when it obtains the write lock
immediately after. This change adds validation that an overrun has
not occurred and, if it has, it will start a new generation.
This commit is contained in:
Ben Johnson
2022-02-06 11:17:31 -07:00
parent 76e53dc6ea
commit 30a8d07a81
3 changed files with 97 additions and 17 deletions

90
db.go
View File

@@ -34,7 +34,7 @@ const (
// MonitorDelayInterval is the time Litestream will wait after receiving a file
// change notification before processing the WAL file for changes.
const MonitorDelayInterval = 100 * time.Millisecond
const MonitorDelayInterval = 10 * time.Millisecond
// MaxIndex is the maximum possible WAL index.
// If this index is reached then a new generation will be started.
@@ -422,14 +422,20 @@ func (db *DB) Close() (err error) {
}
}
// Ensure replicas perform a final sync and stop replicating.
// Ensure replicas stop replicating and perform a final sync.
for _, r := range db.Replicas {
// Stop normal background sync.
r.Stop()
// Force one final sync if DB is open.
if db.db != nil {
if e := r.Sync(ctx); e != nil && err == nil {
err = e
}
}
if e := r.Stop(true); e != nil && err == nil {
// Close out replica.
if e := r.Close(); e != nil && err == nil {
err = e
}
}
@@ -795,10 +801,25 @@ func (db *DB) createGeneration(ctx context.Context) (string, error) {
}
// Sync copies pending data from the WAL to the shadow WAL.
func (db *DB) Sync(ctx context.Context) (err error) {
func (db *DB) Sync(ctx context.Context) error {
const retryN = 5
for i := 0; i < retryN; i++ {
if err := func() error {
db.mu.Lock()
defer db.mu.Unlock()
return db.sync(ctx)
}(); err != nil {
db.Logger.Printf("sync error, retrying: %s", err)
} else {
break
}
}
return nil
}
func (db *DB) sync(ctx context.Context) (err error) {
// Initialize database, if necessary. Exit if no DB exists.
if err := db.init(); err != nil {
return err
@@ -889,7 +910,16 @@ func (db *DB) Sync(ctx context.Context) (err error) {
// Issue the checkpoint.
if checkpoint {
if err := db.checkpoint(ctx, info.generation, checkpointMode); err != nil {
// Under rare circumstances, a checkpoint can be unable to verify continuity
// and will require a restart.
if err := db.checkpoint(ctx, info.generation, checkpointMode); errors.Is(err, errRestartGeneration) {
generation, err := db.createGeneration(ctx)
if err != nil {
return fmt.Errorf("create generation: %w", err)
}
db.Logger.Printf("sync: new generation %q, possible WAL overrun occurred", generation)
} else if err != nil {
return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err)
}
}
@@ -1174,6 +1204,37 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error {
return nil
}
// verifyLastShadowFrame re-reads the last frame read during the shadow copy.
// This ensures that the frame has not been overrun after a checkpoint occurs
// but before the new write lock has been obtained to initialize the new wal index.
func (db *DB) verifyLastShadowFrame(ctx context.Context) error {
// Skip if we don't have a previous frame to verify.
if db.frame == nil {
return nil
}
r, err := os.Open(db.WALPath())
if err != nil {
return err
}
defer r.Close()
// Seek to position of where the last frame was read.
buf := make([]byte, len(db.frame))
if _, err := r.Seek(db.pos.Offset-int64(len(db.frame)), io.SeekStart); err != nil {
return fmt.Errorf("seek to last frame: %w", err)
} else if _, err := io.ReadFull(r, buf); err != nil {
return fmt.Errorf("read last frame: %w", err)
}
// Return a marker error if frames do not match.
if !bytes.Equal(db.frame, buf) {
return errRestartGeneration
}
return nil
}
// WALSegmentReader returns a reader for a section of WAL data at the given position.
// Returns os.ErrNotExist if no matching index/offset is found.
func (db *DB) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error) {
@@ -1304,6 +1365,16 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
return fmt.Errorf("_litestream_lock: %w", err)
}
// Verify we can re-read the last frame copied to the shadow WAL.
// This ensures that another transaction has not overrun the WAL past where
// our previous copy was which would overwrite any additional unread
// frames between the checkpoint & the new write lock.
//
// This only occurs with high load and a short sync frequency so it is rare.
if err := db.verifyLastShadowFrame(ctx); err != nil {
return fmt.Errorf("cannot verify last frame copied from shadow wal: %w", err)
}
// Copy the end of the previous WAL before starting a new shadow WAL.
if err := db.copyToShadowWAL(ctx); err != nil {
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
@@ -1360,6 +1431,11 @@ func (db *DB) execCheckpoint(mode string) (err error) {
}
db.Logger.Printf("checkpoint(%s): [%d,%d,%d]", mode, row[0], row[1], row[2])
// Clear last read frame if we are truncating.
if mode == CheckpointModeTruncate {
db.frame = nil
}
// Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("reacquire read lock: %w", err)
@@ -1543,3 +1619,7 @@ func logPrefixPath(path string) string {
}
return path
}
// A marker error to indicate that a restart checkpoint could not verify
// continuity between WAL indices and a new generation should be started.
var errRestartGeneration = errors.New("restart generation")

View File

@@ -302,7 +302,7 @@ func (p Pos) String() string {
if p.IsZero() {
return ""
}
return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset)
return fmt.Sprintf("%s/%08x:%08x", p.Generation, p.Index, p.Offset)
}
// IsZero returns true if p is the zero value.

View File

@@ -110,7 +110,7 @@ func (r *Replica) Start(ctx context.Context) {
}
// Stop previous replication.
_ = r.Stop(false)
r.Stop()
// Wrap context with cancelation.
ctx, r.cancel = context.WithCancel(ctx)
@@ -123,17 +123,17 @@ func (r *Replica) Start(ctx context.Context) {
}
// Stop cancels any outstanding replication and blocks until finished.
//
// Performing a hard stop will close the DB file descriptor which could release
// locks on per-process locks. Hard stops should only be performed when
// stopping the entire process.
func (r *Replica) Stop(hard bool) (err error) {
func (r *Replica) Stop() {
r.cancel()
r.wg.Wait()
}
// Close will close the DB file descriptor which could release locks on
// per-process locks (e.g. non-Linux OSes).
func (r *Replica) Close() (err error) {
r.muf.Lock()
defer r.muf.Unlock()
if hard && r.f != nil {
if r.f != nil {
if e := r.f.Close(); e != nil && err == nil {
err = e
}
@@ -297,9 +297,9 @@ func (r *Replica) writeIndexSegments(ctx context.Context, segments []WALSegmentI
// Flush LZ4 writer, close pipe, and wait for write to finish.
if err := zw.Close(); err != nil {
return err
return fmt.Errorf("lz4 writer close: %w", err)
} else if err := pw.Close(); err != nil {
return err
return fmt.Errorf("pipe writer close: %w", err)
} else if err := g.Wait(); err != nil {
return err
}