Persist primary/replica copies after validation mismatch

This commit changes `ValidateReplica()` to persist copies of the
primary & replica databases for inspection if a validation mismatch
occurs.
This commit is contained in:
Ben Johnson
2021-01-31 08:47:06 -07:00
parent 3f268b70f8
commit 4e469f8b02
3 changed files with 59 additions and 15 deletions

View File

@@ -1003,9 +1003,16 @@ func compressFile(src, dst string, uid, gid int) error {
func ValidateReplica(ctx context.Context, r Replica) error {
db := r.DB()
// Restore replica to a temporary directory.
tmpdir, err := ioutil.TempDir("", "*-litestream")
if err != nil {
return err
}
// Compute checksum of primary database under lock. This prevents a
// sync from occurring and the database will not be written.
chksum0, pos, err := db.CRC64()
primaryPath := filepath.Join(tmpdir, "primary")
chksum0, pos, err := db.CRC64(primaryPath)
if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err)
}
@@ -1015,14 +1022,7 @@ func ValidateReplica(ctx context.Context, r Replica) error {
return fmt.Errorf("cannot wait for replica: %w", err)
}
// Restore replica to a temporary directory.
tmpdir, err := ioutil.TempDir("", "*-litestream")
if err != nil {
return err
}
defer os.RemoveAll(tmpdir)
restorePath := filepath.Join(tmpdir, "db")
restorePath := filepath.Join(tmpdir, "replica")
if err := RestoreReplica(ctx, r, RestoreOptions{
OutputPath: restorePath,
ReplicaName: r.Name(),
@@ -1049,11 +1049,23 @@ func ValidateReplica(ctx context.Context, r Replica) error {
// Validate checksums match.
if mismatch {
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
// Compress mismatched databases and report temporary path for investigation.
if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil {
return fmt.Errorf("cannot compress primary db: %w", err)
} else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil {
return fmt.Errorf("cannot compress replica db: %w", err)
}
log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir)
return ErrChecksumMismatch
}
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
if err := os.RemoveAll(tmpdir); err != nil {
return fmt.Errorf("cannot remove temporary validation directory: %w", err)
}
return nil
}