From 4e469f8b02a399d9ce1f77a1fb6fe86505e3be85 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 31 Jan 2021 08:47:06 -0700 Subject: [PATCH] Persist primary/replica copies after validation mismatch This commit changes `ValidateReplica()` to persist copies of the primary & replica databases for inspection if a validation mismatch occurs. --- db.go | 36 ++++++++++++++++++++++++++++++++++-- db_test.go | 8 ++++---- replica.go | 30 +++++++++++++++++++++--------- 3 files changed, 59 insertions(+), 15 deletions(-) diff --git a/db.go b/db.go index 8277e5d..8a4acde 100644 --- a/db.go +++ b/db.go @@ -1604,7 +1604,9 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db // This function obtains a read lock so it prevents syncs from occurring until // the operation is complete. The database will still be usable but it will be // unable to checkpoint during this time. -func (db *DB) CRC64() (uint64, Pos, error) { +// +// If dst is set, the database file is copied to that location before checksum. +func (db *DB) CRC64(dst string) (uint64, Pos, error) { db.mu.Lock() defer db.mu.Unlock() @@ -1634,7 +1636,16 @@ func (db *DB) CRC64() (uint64, Pos, error) { } pos.Offset = 0 - chksum, err := checksumFile(db.Path()) + // Copy file, if dst specified. + checksumPath := db.Path() + if dst != "" { + if err := copyFile(dst, db.Path()); err != nil { + return 0, Pos{}, err + } + checksumPath = dst + } + + chksum, err := checksumFile(checksumPath) if err != nil { return 0, pos, err } @@ -1770,3 +1781,24 @@ func headerByteOrder(hdr []byte) (binary.ByteOrder, error) { return nil, fmt.Errorf("invalid wal header magic: %x", magic) } } + +func copyFile(dst, src string) error { + r, err := os.Open(src) + if err != nil { + return err + } + defer r.Close() + + w, err := os.Create(dst) + if err != nil { + return err + } + defer w.Close() + + if _, err := io.Copy(w, r); err != nil { + return err + } else if err := w.Sync(); err != nil { + return err + } + return nil +} diff --git a/db_test.go b/db_test.go index 7227cfb..717682d 100644 --- a/db_test.go +++ b/db_test.go @@ -118,7 +118,7 @@ func TestDB_CRC64(t *testing.T) { t.Run("ErrNotExist", func(t *testing.T) { db := MustOpenDB(t) defer MustCloseDB(t, db) - if _, _, err := db.CRC64(); !os.IsNotExist(err) { + if _, _, err := db.CRC64(""); !os.IsNotExist(err) { t.Fatalf("unexpected error: %#v", err) } }) @@ -131,7 +131,7 @@ func TestDB_CRC64(t *testing.T) { t.Fatal(err) } - chksum0, _, err := db.CRC64() + chksum0, _, err := db.CRC64("") if err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestDB_CRC64(t *testing.T) { // Issue change that is applied to the WAL. Checksum should not change. if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil { t.Fatal(err) - } else if chksum1, _, err := db.CRC64(); err != nil { + } else if chksum1, _, err := db.CRC64(""); err != nil { t.Fatal(err) } else if chksum0 == chksum1 { t.Fatal("expected different checksum event after WAL change") @@ -150,7 +150,7 @@ func TestDB_CRC64(t *testing.T) { t.Fatal(err) } - if chksum2, _, err := db.CRC64(); err != nil { + if chksum2, _, err := db.CRC64(""); err != nil { t.Fatal(err) } else if chksum0 == chksum2 { t.Fatal("expected different checksums after checkpoint") diff --git a/replica.go b/replica.go index 1bcee27..8987485 100644 --- a/replica.go +++ b/replica.go @@ -1003,9 +1003,16 @@ func compressFile(src, dst string, uid, gid int) error { func ValidateReplica(ctx context.Context, r Replica) error { db := r.DB() + // Restore replica to a temporary directory. + tmpdir, err := ioutil.TempDir("", "*-litestream") + if err != nil { + return err + } + // Compute checksum of primary database under lock. This prevents a // sync from occurring and the database will not be written. - chksum0, pos, err := db.CRC64() + primaryPath := filepath.Join(tmpdir, "primary") + chksum0, pos, err := db.CRC64(primaryPath) if err != nil { return fmt.Errorf("cannot compute checksum: %w", err) } @@ -1015,14 +1022,7 @@ func ValidateReplica(ctx context.Context, r Replica) error { return fmt.Errorf("cannot wait for replica: %w", err) } - // Restore replica to a temporary directory. - tmpdir, err := ioutil.TempDir("", "*-litestream") - if err != nil { - return err - } - defer os.RemoveAll(tmpdir) - - restorePath := filepath.Join(tmpdir, "db") + restorePath := filepath.Join(tmpdir, "replica") if err := RestoreReplica(ctx, r, RestoreOptions{ OutputPath: restorePath, ReplicaName: r.Name(), @@ -1049,11 +1049,23 @@ func ValidateReplica(ctx context.Context, r Replica) error { // Validate checksums match. if mismatch { internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc() + + // Compress mismatched databases and report temporary path for investigation. + if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil { + return fmt.Errorf("cannot compress primary db: %w", err) + } else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil { + return fmt.Errorf("cannot compress replica db: %w", err) + } + log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir) + return ErrChecksumMismatch } internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc() + if err := os.RemoveAll(tmpdir); err != nil { + return fmt.Errorf("cannot remove temporary validation directory: %w", err) + } return nil }