Merge pull request #32 from benbjohnson/persist-mismatch-validation-data
Persist primary/replica copies after validation mismatch
This commit is contained in:
36
db.go
36
db.go
@@ -1604,7 +1604,9 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
|
||||
// This function obtains a read lock so it prevents syncs from occurring until
|
||||
// the operation is complete. The database will still be usable but it will be
|
||||
// unable to checkpoint during this time.
|
||||
func (db *DB) CRC64() (uint64, Pos, error) {
|
||||
//
|
||||
// If dst is set, the database file is copied to that location before checksum.
|
||||
func (db *DB) CRC64(dst string) (uint64, Pos, error) {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
@@ -1634,7 +1636,16 @@ func (db *DB) CRC64() (uint64, Pos, error) {
|
||||
}
|
||||
pos.Offset = 0
|
||||
|
||||
chksum, err := checksumFile(db.Path())
|
||||
// Copy file, if dst specified.
|
||||
checksumPath := db.Path()
|
||||
if dst != "" {
|
||||
if err := copyFile(dst, db.Path()); err != nil {
|
||||
return 0, Pos{}, err
|
||||
}
|
||||
checksumPath = dst
|
||||
}
|
||||
|
||||
chksum, err := checksumFile(checksumPath)
|
||||
if err != nil {
|
||||
return 0, pos, err
|
||||
}
|
||||
@@ -1770,3 +1781,24 @@ func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
|
||||
return nil, fmt.Errorf("invalid wal header magic: %x", magic)
|
||||
}
|
||||
}
|
||||
|
||||
func copyFile(dst, src string) error {
|
||||
r, err := os.Open(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
w, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
if _, err := io.Copy(w, r); err != nil {
|
||||
return err
|
||||
} else if err := w.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ func TestDB_CRC64(t *testing.T) {
|
||||
t.Run("ErrNotExist", func(t *testing.T) {
|
||||
db := MustOpenDB(t)
|
||||
defer MustCloseDB(t, db)
|
||||
if _, _, err := db.CRC64(); !os.IsNotExist(err) {
|
||||
if _, _, err := db.CRC64(""); !os.IsNotExist(err) {
|
||||
t.Fatalf("unexpected error: %#v", err)
|
||||
}
|
||||
})
|
||||
@@ -131,7 +131,7 @@ func TestDB_CRC64(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
chksum0, _, err := db.CRC64()
|
||||
chksum0, _, err := db.CRC64("")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -139,7 +139,7 @@ func TestDB_CRC64(t *testing.T) {
|
||||
// Issue change that is applied to the WAL. Checksum should not change.
|
||||
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if chksum1, _, err := db.CRC64(); err != nil {
|
||||
} else if chksum1, _, err := db.CRC64(""); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if chksum0 == chksum1 {
|
||||
t.Fatal("expected different checksum event after WAL change")
|
||||
@@ -150,7 +150,7 @@ func TestDB_CRC64(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if chksum2, _, err := db.CRC64(); err != nil {
|
||||
if chksum2, _, err := db.CRC64(""); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if chksum0 == chksum2 {
|
||||
t.Fatal("expected different checksums after checkpoint")
|
||||
|
||||
30
replica.go
30
replica.go
@@ -1003,9 +1003,16 @@ func compressFile(src, dst string, uid, gid int) error {
|
||||
func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
db := r.DB()
|
||||
|
||||
// Restore replica to a temporary directory.
|
||||
tmpdir, err := ioutil.TempDir("", "*-litestream")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Compute checksum of primary database under lock. This prevents a
|
||||
// sync from occurring and the database will not be written.
|
||||
chksum0, pos, err := db.CRC64()
|
||||
primaryPath := filepath.Join(tmpdir, "primary")
|
||||
chksum0, pos, err := db.CRC64(primaryPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||
}
|
||||
@@ -1015,14 +1022,7 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
return fmt.Errorf("cannot wait for replica: %w", err)
|
||||
}
|
||||
|
||||
// Restore replica to a temporary directory.
|
||||
tmpdir, err := ioutil.TempDir("", "*-litestream")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
restorePath := filepath.Join(tmpdir, "db")
|
||||
restorePath := filepath.Join(tmpdir, "replica")
|
||||
if err := RestoreReplica(ctx, r, RestoreOptions{
|
||||
OutputPath: restorePath,
|
||||
ReplicaName: r.Name(),
|
||||
@@ -1049,11 +1049,23 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
// Validate checksums match.
|
||||
if mismatch {
|
||||
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
|
||||
|
||||
// Compress mismatched databases and report temporary path for investigation.
|
||||
if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil {
|
||||
return fmt.Errorf("cannot compress primary db: %w", err)
|
||||
} else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil {
|
||||
return fmt.Errorf("cannot compress replica db: %w", err)
|
||||
}
|
||||
log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir)
|
||||
|
||||
return ErrChecksumMismatch
|
||||
}
|
||||
|
||||
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
|
||||
|
||||
if err := os.RemoveAll(tmpdir); err != nil {
|
||||
return fmt.Errorf("cannot remove temporary validation directory: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user