diff --git a/db.go b/db.go index 84acd61..5ebacc2 100644 --- a/db.go +++ b/db.go @@ -8,7 +8,6 @@ import ( "encoding/hex" "errors" "fmt" - "hash/crc64" "io" "io/ioutil" "log" @@ -1421,50 +1420,6 @@ func ApplyWAL(ctx context.Context, dbPath, walPath string) error { return d.Close() } -// CRC64 returns a CRC-64 ISO checksum of the database and its current position. -// -// This function obtains a read lock so it prevents syncs from occurring until -// the operation is complete. The database will still be usable but it will be -// unable to checkpoint during this time. -// -// If dst is set, the database file is copied to that location before checksum. -func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) { - db.mu.Lock() - defer db.mu.Unlock() - - if err := db.init(); err != nil { - return 0, Pos{}, err - } else if db.db == nil { - return 0, Pos{}, os.ErrNotExist - } - - generation, err := db.CurrentGeneration() - if err != nil { - return 0, Pos{}, fmt.Errorf("cannot find current generation: %w", err) - } else if generation == "" { - return 0, Pos{}, fmt.Errorf("no current generation") - } - - // Force a RESTART checkpoint to ensure the database is at the start of the WAL. - if err := db.checkpoint(ctx, generation, CheckpointModeRestart); err != nil { - return 0, Pos{}, err - } - - // Obtain current position. Clear the offset since we are only reading the - // DB and not applying the current WAL. - pos := db.pos - pos.Offset = 0 - - // Seek to the beginning of the db file descriptor and checksum whole file. - h := crc64.New(crc64.MakeTable(crc64.ISO)) - if _, err := db.f.Seek(0, io.SeekStart); err != nil { - return 0, pos, err - } else if _, err := io.Copy(h, db.f); err != nil { - return 0, pos, err - } - return h.Sum64(), pos, nil -} - // ReadWALFields iterates over the header & frames in the WAL data in r. // Returns salt, checksum, byte order & the last frame. WAL data must start // from the beginning of the WAL header and must end on either the WAL header diff --git a/db_test.go b/db_test.go index f424fc4..6f54ea8 100644 --- a/db_test.go +++ b/db_test.go @@ -112,51 +112,6 @@ func TestDB_UpdatedAt(t *testing.T) { }) } -// Ensure we can compute a checksum on the real database. -func TestDB_CRC64(t *testing.T) { - t.Run("ErrNotExist", func(t *testing.T) { - db := MustOpenDB(t) - defer MustCloseDB(t, db) - if _, _, err := db.CRC64(context.Background()); !os.IsNotExist(err) { - t.Fatalf("unexpected error: %#v", err) - } - }) - - t.Run("DB", func(t *testing.T) { - db, sqldb := MustOpenDBs(t) - defer MustCloseDBs(t, db, sqldb) - - if err := db.Sync(context.Background()); err != nil { - t.Fatal(err) - } - - chksum0, _, err := db.CRC64(context.Background()) - if err != nil { - t.Fatal(err) - } - - // Issue change that is applied to the WAL. Checksum should not change. - if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil { - t.Fatal(err) - } else if chksum1, _, err := db.CRC64(context.Background()); err != nil { - t.Fatal(err) - } else if chksum0 == chksum1 { - t.Fatal("expected different checksum event after WAL change") - } - - // Checkpoint change into database. Checksum should change. - if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil { - t.Fatal(err) - } - - if chksum2, _, err := db.CRC64(context.Background()); err != nil { - t.Fatal(err) - } else if chksum0 == chksum2 { - t.Fatal("expected different checksums after checkpoint") - } - }) -} - // Ensure we can sync the real WAL to the shadow WAL. func TestDB_Sync(t *testing.T) { // Ensure sync is skipped if no database exists. diff --git a/replica.go b/replica.go index e401d8c..b92b4b4 100644 --- a/replica.go +++ b/replica.go @@ -3,12 +3,10 @@ package litestream import ( "context" "fmt" - "hash/crc64" "io" "io/ioutil" "log" "os" - "path/filepath" "sort" "sync" "time" @@ -118,11 +116,10 @@ func (r *Replica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutine to replicate data. - r.wg.Add(4) + r.wg.Add(3) go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }() go func() { defer r.wg.Done(); r.snapshotter(ctx) }() - go func() { defer r.wg.Done(); r.validator(ctx) }() } // Stop cancels any outstanding replication and blocks until finished. @@ -144,15 +141,6 @@ func (r *Replica) Stop(hard bool) (err error) { return err } -// logPrefix returns the prefix used when logging from the replica. -// This includes the replica name as well as the database path, if available. -func (r *Replica) logPrefix() string { - if db := r.DB(); db != nil { - return fmt.Sprintf("%s(%s): ", db.Path(), r.Name()) - } - return r.Name() + ": " -} - // Sync copies new WAL frames from the shadow WAL to the replica client. func (r *Replica) Sync(ctx context.Context) (err error) { // Clear last position if if an error occurs during sync. @@ -723,160 +711,6 @@ func (r *Replica) snapshotter(ctx context.Context) { } } -// validator runs in a separate goroutine and handles periodic validation. -func (r *Replica) validator(ctx context.Context) { - // Initialize counters since validation occurs infrequently. - for _, status := range []string{"ok", "error"} { - replicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), status).Add(0) - } - - // Exit validation if interval is not set. - if r.ValidationInterval <= 0 { - return - } - - ticker := time.NewTicker(r.ValidationInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := r.Validate(ctx); err != nil { - r.Logger.Printf("validation error: %s", err) - continue - } - } - } -} - -// Validate restores the most recent data from a replica and validates -// that the resulting database matches the current database. -func (r *Replica) Validate(ctx context.Context) error { - db := r.DB() - - // Restore replica to a temporary directory. - tmpdir, err := ioutil.TempDir("", "*-litestream") - if err != nil { - return err - } - defer os.RemoveAll(tmpdir) - - // Compute checksum of primary database under lock. This prevents a - // sync from occurring and the database will not be written. - chksum0, pos, err := db.CRC64(ctx) - if err != nil { - return fmt.Errorf("cannot compute checksum: %w", err) - } - - // Wait until replica catches up to position. - if err := r.waitForReplica(ctx, pos); err != nil { - return fmt.Errorf("cannot wait for replica: %w", err) - } - - // Find lastest snapshot that occurs before the index. - snapshotIndex, err := FindSnapshotForIndex(ctx, r.client, pos.Generation, pos.Index-1) - if err != nil { - return fmt.Errorf("cannot find snapshot index: %w", err) - } - - restorePath := filepath.Join(tmpdir, "replica") - opt := RestoreOptions{ - Logger: log.New(os.Stderr, "", 0), - LogPrefix: r.logPrefix(), - } - if err := Restore(ctx, r.client, restorePath, pos.Generation, snapshotIndex, pos.Index-1, opt); err != nil { - return fmt.Errorf("cannot restore: %w", err) - } - - // Open file handle for restored database. - // NOTE: This open is ok as the restored database is not managed by litestream. - f, err := os.Open(restorePath) - if err != nil { - return err - } - defer f.Close() - - // Read entire file into checksum. - h := crc64.New(crc64.MakeTable(crc64.ISO)) - if _, err := io.Copy(h, f); err != nil { - return err - } - chksum1 := h.Sum64() - - status := "ok" - mismatch := chksum0 != chksum1 - if mismatch { - status = "mismatch" - } - r.Logger.Printf("validator: status=%s db=%016x replica=%016x pos=%s", status, chksum0, chksum1, pos) - - // Validate checksums match. - if mismatch { - replicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), "error").Inc() - return ErrChecksumMismatch - } - - replicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), "ok").Inc() - - if err := os.RemoveAll(tmpdir); err != nil { - return fmt.Errorf("cannot remove temporary validation directory: %w", err) - } - return nil -} - -// waitForReplica blocks until replica reaches at least the given position. -func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error { - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - - timer := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - once := make(chan struct{}, 1) - once <- struct{}{} - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-timer.C: - return fmt.Errorf("replica wait exceeded timeout") - case <-ticker.C: - case <-once: // immediate on first check - } - - // Obtain current position of replica, check if past target position. - curr := r.Pos() - if curr.IsZero() { - r.Logger.Printf("validator: no replica position available") - continue - } - - // Exit if the generation has changed while waiting as there will be - // no further progress on the old generation. - if curr.Generation != pos.Generation { - return fmt.Errorf("generation changed") - } - - ready := true - if curr.Index < pos.Index { - ready = false - } else if curr.Index == pos.Index && curr.Offset < pos.Offset { - ready = false - } - - // If not ready, restart loop. - if !ready { - continue - } - - // Current position at or after target position. - return nil - } -} - // GenerationCreatedAt returns the earliest creation time of any snapshot. // Returns zero time if no snapshots exist. func (r *Replica) GenerationCreatedAt(ctx context.Context, generation string) (time.Time, error) { @@ -970,11 +804,4 @@ var ( Name: "wal_offset", Help: "The current WAL offset", }, []string{"db", "name"}) - - replicaValidationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "litestream", - Subsystem: "replica", - Name: "validation_total", - Help: "The number of validations performed", - }, []string{"db", "name", "status"}) )