diff --git a/db.go b/db.go index 710d155..c08f541 100644 --- a/db.go +++ b/db.go @@ -219,7 +219,7 @@ func (db *DB) Pos() (Pos, error) { return Pos{}, err } - return Pos{Generation: generation, Index: index, Offset: fi.Size()}, nil + return Pos{Generation: generation, Index: index, Offset: frameAlign(fi.Size(), db.pageSize)}, nil } // Notify returns a channel that closes when the shadow WAL changes. @@ -764,16 +764,16 @@ func (db *DB) verify() (info syncInfo, err error) { // Open shadow WAL to copy append to. info.shadowWALPath, err = db.CurrentShadowWALPath(info.generation) - if info.shadowWALPath == "" { - info.reason = "no shadow wal" - return info, nil - } else if err != nil { + if err != nil { return info, fmt.Errorf("cannot determine shadow WAL: %w", err) } // Determine shadow WAL current size. fi, err = os.Stat(info.shadowWALPath) - if err != nil { + if os.IsNotExist(err) { + info.reason = "no shadow wal" + return info, nil + } else if err != nil { return info, err } info.shadowWALSize = frameAlign(fi.Size(), db.pageSize) @@ -781,7 +781,8 @@ func (db *DB) verify() (info syncInfo, err error) { // Truncate shadow WAL if there is a partial page. // Exit if shadow WAL does not contain a full header. if info.shadowWALSize < WALHeaderSize { - return info, fmt.Errorf("short shadow wal: %s", info.shadowWALPath) + info.reason = "short shadow wal" + return info, nil } // If shadow WAL is larger than real WAL then the WAL has been truncated @@ -907,6 +908,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { if err != nil { return 0, err } + origSize := frameAlign(fi.Size(), db.pageSize) // Read shadow WAL header to determine byte order for checksum & salt. hdr := make([]byte, WALHeaderSize) @@ -928,16 +930,13 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { } // Seek to correct position on both files. - if _, err := r.Seek(fi.Size(), io.SeekStart); err != nil { + if _, err := r.Seek(origSize, io.SeekStart); err != nil { return 0, fmt.Errorf("wal seek: %w", err) - } else if _, err := w.Seek(fi.Size(), io.SeekStart); err != nil { + } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { return 0, fmt.Errorf("shadow wal seek: %w", err) } - // TODO: Optimize to use bufio on reader & writer to minimize syscalls. - // Loop over each page, verify checksum, & copy to writer. - origSize := fi.Size() newSize = origSize buf := make([]byte, db.pageSize+WALFrameHeaderSize) for { @@ -1103,8 +1102,8 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) { offset := int64(WALHeaderChecksumOffset) if fi, err := f.Stat(); err != nil { return 0, 0, err - } else if fi.Size() > WALHeaderSize { - offset = fi.Size() - int64(pageSize) - WALFrameHeaderSize + WALFrameHeaderChecksumOffset + } else if sz := frameAlign(fi.Size(), pageSize); fi.Size() > WALHeaderSize { + offset = sz - int64(pageSize) - WALFrameHeaderSize + WALFrameHeaderChecksumOffset } // Read big endian checksum. @@ -1117,8 +1116,19 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) { return binary.BigEndian.Uint32(b[0:]), binary.BigEndian.Uint32(b[4:]), nil } -// checkpoint performs a checkpoint on the WAL file. +// Checkpoint performs a checkpoint on the WAL file. +func (db *DB) Checkpoint(mode string) (err error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.checkpoint(mode) +} + func (db *DB) checkpoint(mode string) (err error) { + // Ignore if there is no underlying database. + if db.db == nil { + return nil + } + // Track checkpoint metrics. t := time.Now() defer func() { diff --git a/db_test.go b/db_test.go index ec0cf7c..688c413 100644 --- a/db_test.go +++ b/db_test.go @@ -2,6 +2,7 @@ package litestream_test import ( "database/sql" + "io/ioutil" "os" "path/filepath" "testing" @@ -193,39 +194,327 @@ func TestDB_Sync(t *testing.T) { } }) - // Ensure a WAL file is created if one does not already exist. - t.Run("NoWAL", func(t *testing.T) { - t.Skip() - }) - // Ensure DB can keep in sync across multiple Sync() invocations. t.Run("MultiSync", func(t *testing.T) { - t.Skip() + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + + // Execute a query to force a write to the WAL. + if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { + t.Fatal(err) + } + + // Perform initial sync & grab initial position. + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + pos0, err := db.Pos() + if err != nil { + t.Fatal(err) + } + + // Insert into table. + if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil { + t.Fatal(err) + } + + // Sync to ensure position moves forward one page. + if err := db.Sync(); err != nil { + t.Fatal(err) + } else if pos1, err := db.Pos(); err != nil { + t.Fatal(err) + } else if pos0.Generation != pos1.Generation { + t.Fatal("expected the same generation") + } else if got, want := pos1.Index, pos0.Index; got != want { + t.Fatalf("Index=%v, want %v", got, want) + } else if got, want := pos1.Offset, pos0.Offset+4096+litestream.WALFrameHeaderSize; got != want { + t.Fatalf("Offset=%v, want %v", got, want) + } + }) + + // Ensure a WAL file is created if one does not already exist. + t.Run("NoWAL", func(t *testing.T) { + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + + // Issue initial sync and truncate WAL. + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Obtain initial position. + pos0, err := db.Pos() + if err != nil { + t.Fatal(err) + } + + // Checkpoint & fully close which should close WAL file. + if err := db.Checkpoint(litestream.CheckpointModeTruncate); err != nil { + t.Fatal(err) + } else if err := db.Close(); err != nil { + t.Fatal(err) + } else if err := sqldb.Close(); err != nil { + t.Fatal(err) + } + + // Verify WAL does not exist. + if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) { + t.Fatal(err) + } + + // Reopen the managed database. + db = MustOpenDBAt(t, db.Path()) + defer MustCloseDB(t, db) + + // Re-sync and ensure new generation has been created. + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Obtain initial position. + if pos1, err := db.Pos(); err != nil { + t.Fatal(err) + } else if pos0.Generation == pos1.Generation { + t.Fatal("expected new generation after truncation") + } }) // Ensure DB can start new generation if it detects it cannot verify last position. - t.Run("NewGeneration", func(t *testing.T) { - t.Skip() + t.Run("OverwritePrevPosition", func(t *testing.T) { + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + + // Execute a query to force a write to the WAL. + if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { + t.Fatal(err) + } + + // Issue initial sync and truncate WAL. + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Obtain initial position. + pos0, err := db.Pos() + if err != nil { + t.Fatal(err) + } + + // Fully close which should close WAL file. + if err := db.Close(); err != nil { + t.Fatal(err) + } else if err := sqldb.Close(); err != nil { + t.Fatal(err) + } + + // Verify WAL does not exist. + if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) { + t.Fatal(err) + } + + // Insert into table multiple times to move past old offset + sqldb = MustOpenSQLDB(t, db.Path()) + defer MustCloseSQLDB(t, sqldb) + for i := 0; i < 100; i++ { + if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil { + t.Fatal(err) + } + } + + // Reopen the managed database. + db = MustOpenDBAt(t, db.Path()) + defer MustCloseDB(t, db) + + // Re-sync and ensure new generation has been created. + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Obtain initial position. + if pos1, err := db.Pos(); err != nil { + t.Fatal(err) + } else if pos0.Generation == pos1.Generation { + t.Fatal("expected new generation after truncation") + } + }) + + // Ensure DB can handle a mismatched header-only and start new generation. + t.Run("WALHeaderMismatch", func(t *testing.T) { + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + + // Execute a query to force a write to the WAL and then sync. + if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { + t.Fatal(err) + } else if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Grab initial position & close. + pos0, err := db.Pos() + if err != nil { + t.Fatal(err) + } else if err := db.Close(); err != nil { + t.Fatal(err) + } + + // Read existing file, update header checksum, and write back only header + // to simulate a header with a mismatched checksum. + shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index) + if buf, err := ioutil.ReadFile(shadowWALPath); err != nil { + t.Fatal(err) + } else if ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil { + t.Fatal(err) + } + + // Reopen managed database & ensure sync will still work. + db = MustOpenDBAt(t, db.Path()) + defer MustCloseDB(t, db) + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Verify a new generation was started. + if pos1, err := db.Pos(); err != nil { + t.Fatal(err) + } else if pos0.Generation == pos1.Generation { + t.Fatal("expected new generation") + } }) // Ensure DB can handle partial shadow WAL header write. t.Run("PartialShadowWALHeader", func(t *testing.T) { - t.Skip() + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + + // Execute a query to force a write to the WAL and then sync. + if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { + t.Fatal(err) + } else if err := db.Sync(); err != nil { + t.Fatal(err) + } + + pos0, err := db.Pos() + if err != nil { + t.Fatal(err) + } + + // Close & truncate shadow WAL to simulate a partial header write. + if err := db.Close(); err != nil { + t.Fatal(err) + } else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), litestream.WALHeaderSize-1); err != nil { + t.Fatal(err) + } + + // Reopen managed database & ensure sync will still work. + db = MustOpenDBAt(t, db.Path()) + defer MustCloseDB(t, db) + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Verify a new generation was started. + if pos1, err := db.Pos(); err != nil { + t.Fatal(err) + } else if pos0.Generation == pos1.Generation { + t.Fatal("expected new generation") + } }) // Ensure DB can handle partial shadow WAL writes. t.Run("PartialShadowWALFrame", func(t *testing.T) { - t.Skip() + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + + // Execute a query to force a write to the WAL and then sync. + if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { + t.Fatal(err) + } else if err := db.Sync(); err != nil { + t.Fatal(err) + } + + pos0, err := db.Pos() + if err != nil { + t.Fatal(err) + } + + // Obtain current shadow WAL size. + fi, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)) + if err != nil { + t.Fatal(err) + } + + // Close & truncate shadow WAL to simulate a partial frame write. + if err := db.Close(); err != nil { + t.Fatal(err) + } else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), fi.Size()-1); err != nil { + t.Fatal(err) + } + + // Reopen managed database & ensure sync will still work. + db = MustOpenDBAt(t, db.Path()) + defer MustCloseDB(t, db) + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Verify same generation is kept. + if pos1, err := db.Pos(); err != nil { + t.Fatal(err) + } else if got, want := pos1, pos0; got != want { + t.Fatalf("Pos()=%s want %s", got, want) + } + + // Ensure shadow WAL has recovered. + if fi0, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil { + t.Fatal(err) + } else if got, want := fi0.Size(), fi.Size(); got != want { + t.Fatalf("Size()=%v, want %v", got, want) + } }) // Ensure DB can handle a generation directory with a missing shadow WAL. t.Run("NoShadowWAL", func(t *testing.T) { - t.Skip() - }) + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) - // Ensure DB can handle a mismatched header-only and start new generation. - t.Run("MismatchHeaderOnly", func(t *testing.T) { - t.Skip() + // Execute a query to force a write to the WAL and then sync. + if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { + t.Fatal(err) + } else if err := db.Sync(); err != nil { + t.Fatal(err) + } + + pos0, err := db.Pos() + if err != nil { + t.Fatal(err) + } + + // Close & delete shadow WAL to simulate dir created but not WAL. + if err := db.Close(); err != nil { + t.Fatal(err) + } else if err := os.Remove(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil { + t.Fatal(err) + } + + // Reopen managed database & ensure sync will still work. + db = MustOpenDBAt(t, db.Path()) + defer MustCloseDB(t, db) + if err := db.Sync(); err != nil { + t.Fatal(err) + } + + // Verify new generation created but index/offset the same. + if pos1, err := db.Pos(); err != nil { + t.Fatal(err) + } else if pos0.Generation == pos1.Generation { + t.Fatal("expected new generation") + } else if got, want := pos1.Index, pos0.Index; got != want { + t.Fatalf("Index=%v want %v", got, want) + } else if got, want := pos1.Offset, pos0.Offset; got != want { + t.Fatalf("Offset=%v want %v", got, want) + } }) // Ensure DB checkpoints after minimum number of pages. @@ -258,10 +547,14 @@ func MustCloseDBs(tb testing.TB, db *litestream.DB, sqldb *sql.DB) { // MustOpenDB returns a new instance of a DB. func MustOpenDB(tb testing.TB) *litestream.DB { - tb.Helper() - dir := tb.TempDir() - db := litestream.NewDB(filepath.Join(dir, "db")) + return MustOpenDBAt(tb, filepath.Join(dir, "db")) +} + +// MustOpenDBAt returns a new instance of a DB for a given path. +func MustOpenDBAt(tb testing.TB, path string) *litestream.DB { + tb.Helper() + db := litestream.NewDB(path) db.MonitorInterval = 0 // disable background goroutine if err := db.Open(); err != nil { tb.Fatal(err)