From 80280fce5454e3ea9245e99d960f0998e6b940ad Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 1 Mar 2024 10:05:58 -0700 Subject: [PATCH] Only update litestream_seq if size is below WAL header size --- db.go | 16 ++++++++++------ db_test.go | 16 ++-------------- replica_test.go | 16 +++++++++------- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/db.go b/db.go index 0d00168..bd58a6d 100644 --- a/db.go +++ b/db.go @@ -629,7 +629,7 @@ func (db *DB) acquireReadLock() error { } // Execute read query to obtain read lock. - if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { + if _, err := tx.ExecContext(context.Background(), `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { _ = tx.Rollback() return err } @@ -649,6 +649,10 @@ func (db *DB) releaseReadLock() error { // Rollback & clear read transaction. err := db.rtx.Rollback() db.rtx = nil + + if errors.Is(err, context.Canceled) { + err = nil + } return err } @@ -693,7 +697,7 @@ func (db *DB) createGeneration() (string, error) { // Atomically write generation name as current generation. generationNamePath := db.GenerationNamePath() - mode := os.FileMode(0600) + mode := os.FileMode(0o600) if db.fileInfo != nil { mode = db.fileInfo.Mode() } @@ -986,7 +990,7 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) { } // Write header to new WAL shadow file. - mode := os.FileMode(0600) + mode := os.FileMode(0o600) if fi := db.fileInfo; fi != nil { mode = fi.Mode() } @@ -1022,7 +1026,7 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64 } origWalSize = frameAlign(fi.Size(), db.pageSize) - w, err := os.OpenFile(filename, os.O_RDWR, 0666) + w, err := os.OpenFile(filename, os.O_RDWR, 0o666) if err != nil { return 0, 0, err } @@ -1334,7 +1338,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { // a new page is written. if err := db.execCheckpoint(mode); err != nil { return err - } else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil { + } else if err := db.ensureWALExists(); err != nil { return err } @@ -1424,7 +1428,7 @@ func (db *DB) execCheckpoint(mode string) (err error) { // Reacquire the read lock immediately after the checkpoint. if err := db.acquireReadLock(); err != nil { - return fmt.Errorf("release read lock: %w", err) + return fmt.Errorf("acquire read lock: %w", err) } return nil diff --git a/db_test.go b/db_test.go index 4e1c494..4ad8149 100644 --- a/db_test.go +++ b/db_test.go @@ -375,7 +375,7 @@ func TestDB_Sync(t *testing.T) { shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index) if buf, err := os.ReadFile(shadowWALPath); err != nil { t.Fatal(err) - } else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil { + } else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0o600); err != nil { t.Fatal(err) } @@ -552,12 +552,7 @@ func TestDB_Sync(t *testing.T) { t.Fatal(err) } - // Ensure position is now on the second index. - if pos, err := db.Pos(); err != nil { - t.Fatal(err) - } else if got, want := pos.Index, 1; got != want { - t.Fatalf("Index=%v, want %v", got, want) - } + // NOTE: The minimum checkpoint may only do a PASSIVE checkpoint so we can't guarantee a rollover. }) // Ensure DB checkpoints after interval. @@ -581,13 +576,6 @@ func TestDB_Sync(t *testing.T) { } else if err := db.Sync(context.Background()); err != nil { t.Fatal(err) } - - // Ensure position is now on the second index. - if pos, err := db.Pos(); err != nil { - t.Fatal(err) - } else if got, want := pos.Index, 1; got != want { - t.Fatalf("Index=%v, want %v", got, want) - } }) } diff --git a/replica_test.go b/replica_test.go index 78ba620..c63981a 100644 --- a/replica_test.go +++ b/replica_test.go @@ -50,6 +50,10 @@ func TestReplica_Sync(t *testing.T) { t.Fatal(err) } + if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil { + t.Fatal(err) + } + c := file.NewReplicaClient(t.TempDir()) r := litestream.NewReplica(db, "") c.Replica, r.Client = r, c @@ -142,7 +146,7 @@ func TestReplica_Snapshot(t *testing.T) { t.Fatal(err) } else if info, err := r.Snapshot(context.Background()); err != nil { t.Fatal(err) - } else if got, want := info.Pos(), nextIndex(pos0); got != want { + } else if got, want := info.Pos(), pos0.Truncate(); got != want { t.Fatalf("pos=%s, want %s", got, want) } @@ -166,20 +170,18 @@ func TestReplica_Snapshot(t *testing.T) { t.Fatal(err) } else if info, err := r.Snapshot(context.Background()); err != nil { t.Fatal(err) - } else if got, want := info.Pos(), nextIndex(pos1); got != want { + } else if got, want := info.Pos(), pos1.Truncate(); got != want { t.Fatalf("pos=%v, want %v", got, want) } - // Verify three snapshots exist. + // Verify snapshots exist. if infos, err := r.Snapshots(context.Background()); err != nil { t.Fatal(err) - } else if got, want := len(infos), 3; got != want { + } else if got, want := len(infos), 2; got != want { t.Fatalf("len=%v, want %v", got, want) } else if got, want := infos[0].Pos(), pos0.Truncate(); got != want { t.Fatalf("info[0]=%s, want %s", got, want) - } else if got, want := infos[1].Pos(), nextIndex(pos0); got != want { + } else if got, want := infos[1].Pos(), pos1.Truncate(); got != want { t.Fatalf("info[1]=%s, want %s", got, want) - } else if got, want := infos[2].Pos(), nextIndex(pos1); got != want { - t.Fatalf("info[2]=%s, want %s", got, want) } }