From 979cabcdb935fee75e611632de64cd5005aacefa Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 1 Jan 2021 10:02:03 -0700 Subject: [PATCH] Add some DB.Sync() tests --- db.go | 25 +++++---------- db_test.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 17 deletions(-) diff --git a/db.go b/db.go index 308aa46..710d155 100644 --- a/db.go +++ b/db.go @@ -332,12 +332,9 @@ func (db *DB) WALs(ctx context.Context) ([]*WALInfo, error) { return infos, nil } -// Init initializes the connection to the database. +// init initializes the connection to the database. // Skipped if already initialized or if the database file does not exist. -func (db *DB) Init() (err error) { - db.mu.Lock() - defer db.mu.Unlock() - +func (db *DB) init() (err error) { // Exit if already initialized. if db.db != nil { return nil @@ -357,7 +354,7 @@ func (db *DB) Init() (err error) { return fmt.Errorf("enable wal: %w", err) } - // Disable autocheckpoint. + // Disable autocheckpoint for litestream's connection. if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil { return fmt.Errorf("disable autocheckpoint: %w", err) } @@ -601,8 +598,10 @@ func (db *DB) Sync() (err error) { db.mu.Lock() defer db.mu.Unlock() - // No database exists, exit. - if db.db == nil { + // Initialize database, if necessary. Exit if no DB exists. + if err := db.init(); err != nil { + return err + } else if db.db == nil { return nil } @@ -616,8 +615,6 @@ func (db *DB) Sync() (err error) { db.syncSecondsCounter.Add(float64(time.Since(t).Seconds())) }() - // TODO: Force "-wal" file if it doesn't exist. - // Ensure WAL has at least one frame in it. if err := db.ensureWALExists(); err != nil { return fmt.Errorf("ensure wal exists: %w", err) @@ -685,7 +682,7 @@ func (db *DB) Sync() (err error) { checkpoint = true } else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { checkpoint, checkpointMode = true, CheckpointModeRestart - } else if db.CheckpointInterval > 0 && time.Since(db.lastCheckpointAt) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) { + } else if db.CheckpointInterval > 0 && !db.lastCheckpointAt.IsZero() && time.Since(db.lastCheckpointAt) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) { checkpoint = true } @@ -1221,12 +1218,6 @@ func (db *DB) monitor() { case <-ticker.C: } - // Ensure the database is initialized. - if err := db.Init(); err != nil { - log.Printf("%s: init error: %s", db.path, err) - continue - } - // Sync the database to the shadow WAL. if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) { log.Printf("%s: sync error: %s", db.path, err) diff --git a/db_test.go b/db_test.go index 655abe7..ec0cf7c 100644 --- a/db_test.go +++ b/db_test.go @@ -150,6 +150,100 @@ func TestDB_CRC64(t *testing.T) { }) } +// Ensure we can sync the real WAL to the shadow WAL. +func TestDB_Sync(t *testing.T) { + // Ensure sync is skipped if no database exists. + t.Run("NoDB", func(t *testing.T) { + db := MustOpenDB(t) + defer MustCloseDB(t, db) + if err := db.Sync(); err != nil { + t.Fatal(err) + } + }) + + // Ensure sync can successfully run on the initial sync. + t.Run("Initial", func(t *testing.T) { + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Verify page size if now available. + if db.PageSize() == 0 { + t.Fatal("expected page size after initial sync") + } + + // Obtain real WAL size. + fi, err := os.Stat(db.WALPath()) + if err != nil { + t.Fatal(err) + } + + // Ensure position now available. + if pos, err := db.Pos(); err != nil { + t.Fatal(err) + } else if pos.Generation == "" { + t.Fatal("expected generation") + } else if got, want := pos.Index, 0; got != want { + t.Fatalf("pos.Index=%v, want %v", got, want) + } else if got, want := pos.Offset, fi.Size(); got != want { + t.Fatalf("pos.Offset=%v, want %v", got, want) + } + }) + + // Ensure a WAL file is created if one does not already exist. + t.Run("NoWAL", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB can keep in sync across multiple Sync() invocations. + t.Run("MultiSync", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB can start new generation if it detects it cannot verify last position. + t.Run("NewGeneration", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB can handle partial shadow WAL header write. + t.Run("PartialShadowWALHeader", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB can handle partial shadow WAL writes. + t.Run("PartialShadowWALFrame", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB can handle a generation directory with a missing shadow WAL. + t.Run("NoShadowWAL", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB can handle a mismatched header-only and start new generation. + t.Run("MismatchHeaderOnly", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB checkpoints after minimum number of pages. + t.Run("MinCheckpointPageN", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB forces checkpoint after maximum number of pages. + t.Run("MaxCheckpointPageN", func(t *testing.T) { + t.Skip() + }) + + // Ensure DB checkpoints after interval. + t.Run("CheckpointInterval", func(t *testing.T) { + t.Skip() + }) +} + // MustOpenDBs returns a new instance of a DB & associated SQL DB. func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) { db := MustOpenDB(tb)