From 25fec29e1a0434c35264d8c45cb4196769ea374a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 16 Jan 2021 07:45:08 -0700 Subject: [PATCH] Clear last position on replica sync error --- db.go | 12 ++++++------ replica.go | 11 ++++++++++- s3/s3.go | 10 ++++++++++ 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/db.go b/db.go index e32b640..ee776be 100644 --- a/db.go +++ b/db.go @@ -713,7 +713,7 @@ func (db *DB) Sync() (err error) { if checkpoint { changed = true - if err := db.checkpointAndInit(info, checkpointMode); err != nil { + if err := db.checkpointAndInit(info.shadowWALPath, checkpointMode); err != nil { return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err) } } @@ -1199,7 +1199,7 @@ func (db *DB) checkpoint(mode string) (err error) { // checkpointAndInit performs a checkpoint on the WAL file and initializes a // new shadow WAL file. -func (db *DB) checkpointAndInit(info syncInfo, mode string) error { +func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error { // Read WAL header before checkpoint to check if it has been restarted. hdr, err := readWALHeader(db.WALPath()) if err != nil { @@ -1222,18 +1222,18 @@ func (db *DB) checkpointAndInit(info syncInfo, mode string) error { } // Copy the end of the previous WAL before starting a new shadow WAL. - if _, err := db.copyToShadowWAL(info.shadowWALPath); err != nil { + if _, err := db.copyToShadowWAL(shadowWALPath); err != nil { return fmt.Errorf("cannot copy to end of shadow wal: %w", err) } // Parse index of current shadow WAL file. - index, _, _, _, err := ParseWALPath(info.shadowWALPath) + index, _, _, _, err := ParseWALPath(shadowWALPath) if err != nil { - return fmt.Errorf("cannot parse shadow wal filename: %s", info.shadowWALPath) + return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath) } // Start a new shadow WAL file with next index. - newShadowWALPath := filepath.Join(filepath.Dir(info.shadowWALPath), FormatWALPath(index+1)) + newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1)) if err := db.initShadowWALFile(newShadowWALPath); err != nil { return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } diff --git a/replica.go b/replica.go index 070591a..d6e53f4 100644 --- a/replica.go +++ b/replica.go @@ -566,6 +566,15 @@ func (r *FileReplica) snapshotN(generation string) (int, error) { } func (r *FileReplica) Sync(ctx context.Context) (err error) { + // Clear last position if if an error occurs during sync. + defer func() { + if err != nil { + r.mu.Lock() + r.pos = Pos{} + r.mu.Unlock() + } + }() + // Find current position of database. dpos, err := r.db.Pos() if err != nil { @@ -588,7 +597,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { } // Determine position, if necessary. - if r.LastPos().IsZero() { + if r.LastPos().Generation != generation { pos, err := r.CalcPos(ctx, generation) if err != nil { return fmt.Errorf("cannot determine replica position: %s", err) diff --git a/s3/s3.go b/s3/s3.go index 4d5c846..c75d5ca 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -678,6 +678,16 @@ func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, } func (r *Replica) Sync(ctx context.Context) (err error) { + // Clear last position if if an error occurs during sync. + defer func() { + if err != nil { + r.mu.Lock() + r.pos = litestream.Pos{} + r.mu.Unlock() + } + }() + + // Connect to S3, if necessary. if err := r.Init(ctx); err != nil { return err }