Add some DB.Sync() tests
This commit is contained in:
25
db.go
25
db.go
@@ -332,12 +332,9 @@ func (db *DB) WALs(ctx context.Context) ([]*WALInfo, error) {
|
|||||||
return infos, nil
|
return infos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes the connection to the database.
|
// init initializes the connection to the database.
|
||||||
// Skipped if already initialized or if the database file does not exist.
|
// Skipped if already initialized or if the database file does not exist.
|
||||||
func (db *DB) Init() (err error) {
|
func (db *DB) init() (err error) {
|
||||||
db.mu.Lock()
|
|
||||||
defer db.mu.Unlock()
|
|
||||||
|
|
||||||
// Exit if already initialized.
|
// Exit if already initialized.
|
||||||
if db.db != nil {
|
if db.db != nil {
|
||||||
return nil
|
return nil
|
||||||
@@ -357,7 +354,7 @@ func (db *DB) Init() (err error) {
|
|||||||
return fmt.Errorf("enable wal: %w", err)
|
return fmt.Errorf("enable wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disable autocheckpoint.
|
// Disable autocheckpoint for litestream's connection.
|
||||||
if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil {
|
if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil {
|
||||||
return fmt.Errorf("disable autocheckpoint: %w", err)
|
return fmt.Errorf("disable autocheckpoint: %w", err)
|
||||||
}
|
}
|
||||||
@@ -601,8 +598,10 @@ func (db *DB) Sync() (err error) {
|
|||||||
db.mu.Lock()
|
db.mu.Lock()
|
||||||
defer db.mu.Unlock()
|
defer db.mu.Unlock()
|
||||||
|
|
||||||
// No database exists, exit.
|
// Initialize database, if necessary. Exit if no DB exists.
|
||||||
if db.db == nil {
|
if err := db.init(); err != nil {
|
||||||
|
return err
|
||||||
|
} else if db.db == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -616,8 +615,6 @@ func (db *DB) Sync() (err error) {
|
|||||||
db.syncSecondsCounter.Add(float64(time.Since(t).Seconds()))
|
db.syncSecondsCounter.Add(float64(time.Since(t).Seconds()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// TODO: Force "-wal" file if it doesn't exist.
|
|
||||||
|
|
||||||
// Ensure WAL has at least one frame in it.
|
// Ensure WAL has at least one frame in it.
|
||||||
if err := db.ensureWALExists(); err != nil {
|
if err := db.ensureWALExists(); err != nil {
|
||||||
return fmt.Errorf("ensure wal exists: %w", err)
|
return fmt.Errorf("ensure wal exists: %w", err)
|
||||||
@@ -685,7 +682,7 @@ func (db *DB) Sync() (err error) {
|
|||||||
checkpoint = true
|
checkpoint = true
|
||||||
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
|
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
|
||||||
checkpoint, checkpointMode = true, CheckpointModeRestart
|
checkpoint, checkpointMode = true, CheckpointModeRestart
|
||||||
} else if db.CheckpointInterval > 0 && time.Since(db.lastCheckpointAt) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) {
|
} else if db.CheckpointInterval > 0 && !db.lastCheckpointAt.IsZero() && time.Since(db.lastCheckpointAt) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) {
|
||||||
checkpoint = true
|
checkpoint = true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1221,12 +1218,6 @@ func (db *DB) monitor() {
|
|||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the database is initialized.
|
|
||||||
if err := db.Init(); err != nil {
|
|
||||||
log.Printf("%s: init error: %s", db.path, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync the database to the shadow WAL.
|
// Sync the database to the shadow WAL.
|
||||||
if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) {
|
if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) {
|
||||||
log.Printf("%s: sync error: %s", db.path, err)
|
log.Printf("%s: sync error: %s", db.path, err)
|
||||||
|
|||||||
94
db_test.go
94
db_test.go
@@ -150,6 +150,100 @@ func TestDB_CRC64(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure we can sync the real WAL to the shadow WAL.
|
||||||
|
func TestDB_Sync(t *testing.T) {
|
||||||
|
// Ensure sync is skipped if no database exists.
|
||||||
|
t.Run("NoDB", func(t *testing.T) {
|
||||||
|
db := MustOpenDB(t)
|
||||||
|
defer MustCloseDB(t, db)
|
||||||
|
if err := db.Sync(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure sync can successfully run on the initial sync.
|
||||||
|
t.Run("Initial", func(t *testing.T) {
|
||||||
|
db, sqldb := MustOpenDBs(t)
|
||||||
|
defer MustCloseDBs(t, db, sqldb)
|
||||||
|
|
||||||
|
if err := db.Sync(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify page size if now available.
|
||||||
|
if db.PageSize() == 0 {
|
||||||
|
t.Fatal("expected page size after initial sync")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain real WAL size.
|
||||||
|
fi, err := os.Stat(db.WALPath())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure position now available.
|
||||||
|
if pos, err := db.Pos(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if pos.Generation == "" {
|
||||||
|
t.Fatal("expected generation")
|
||||||
|
} else if got, want := pos.Index, 0; got != want {
|
||||||
|
t.Fatalf("pos.Index=%v, want %v", got, want)
|
||||||
|
} else if got, want := pos.Offset, fi.Size(); got != want {
|
||||||
|
t.Fatalf("pos.Offset=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure a WAL file is created if one does not already exist.
|
||||||
|
t.Run("NoWAL", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB can keep in sync across multiple Sync() invocations.
|
||||||
|
t.Run("MultiSync", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB can start new generation if it detects it cannot verify last position.
|
||||||
|
t.Run("NewGeneration", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB can handle partial shadow WAL header write.
|
||||||
|
t.Run("PartialShadowWALHeader", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB can handle partial shadow WAL writes.
|
||||||
|
t.Run("PartialShadowWALFrame", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB can handle a generation directory with a missing shadow WAL.
|
||||||
|
t.Run("NoShadowWAL", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB can handle a mismatched header-only and start new generation.
|
||||||
|
t.Run("MismatchHeaderOnly", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB checkpoints after minimum number of pages.
|
||||||
|
t.Run("MinCheckpointPageN", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB forces checkpoint after maximum number of pages.
|
||||||
|
t.Run("MaxCheckpointPageN", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure DB checkpoints after interval.
|
||||||
|
t.Run("CheckpointInterval", func(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// MustOpenDBs returns a new instance of a DB & associated SQL DB.
|
// MustOpenDBs returns a new instance of a DB & associated SQL DB.
|
||||||
func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) {
|
func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) {
|
||||||
db := MustOpenDB(tb)
|
db := MustOpenDB(tb)
|
||||||
|
|||||||
Reference in New Issue
Block a user