Fix checksum logging

This commit is contained in:
Ben Johnson
2021-01-17 10:19:39 -07:00
parent 90a1d959d4
commit d259d9b9e3
2 changed files with 23 additions and 39 deletions

45
db.go
View File

@@ -1319,34 +1319,27 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
tmpPath := outputPath + ".tmp"
// Copy snapshot to output path.
log.Printf("%s(%s): restoring snapshot %s/%08x to %s", db.path, r.Name(), generation, minWALIndex, tmpPath)
var chksum uint64
if !opt.DryRun {
if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err)
}
chksum, err := checksumFile(tmpPath)
if err != nil {
log.Printf("%s(%s): cannot checksum snapshot: %s", db.path, r.Name(), err)
} else {
log.Printf("%s(%s): restored snapshot checksum: %016x", db.path, r.Name(), chksum)
}
chksum, _ = checksumFile(tmpPath)
}
log.Printf("%s(%s): restoring snapshot %s/%08x to %s, checksum=%016x", db.path, r.Name(), generation, minWALIndex, tmpPath, chksum)
// Restore each WAL file until we reach our maximum index.
for index := minWALIndex; index <= maxWALIndex; index++ {
log.Printf("%s(%s): restoring wal %s/%08x", db.path, r.Name(), generation, index)
if opt.DryRun {
continue
}
if err := db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
if !opt.DryRun {
if chksum, err = db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
log.Printf("%s(%s): no wal available, snapshot only", db.path, r.Name())
break // snapshot file only, ignore error
} else if err != nil {
return fmt.Errorf("cannot restore wal: %w", err)
}
}
log.Printf("%s(%s): restored wal %s/%08x, checksum=%016x", db.path, r.Name(), generation, index, chksum)
}
// Copy file to final location.
log.Printf("%s(%s): renaming database from temporary location", db.path, r.Name())
@@ -1459,49 +1452,45 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string,
}
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) (uint64, error) {
// Open WAL file from replica.
rd, err := r.WALReader(ctx, generation, index)
if err != nil {
return err
return 0, err
}
defer rd.Close()
// Open handle to destination WAL path.
f, err := createFile(dbPath+"-wal", db.uid, db.gid)
if err != nil {
return err
return 0, err
}
defer f.Close()
// Copy WAL to target path.
if _, err := io.Copy(f, rd); err != nil {
return err
return 0, err
} else if err := f.Close(); err != nil {
return err
return 0, err
}
chksum, err := checksumFile(dbPath + "-wal")
if err != nil {
log.Printf("%s(%s): cannot checksum wal: %s", db.path, r.Name(), err)
} else {
chksum, _ := checksumFile(dbPath + "-wal")
log.Printf("%s(%s): restored wal checksum: %016x", db.path, r.Name(), chksum)
}
// Open SQLite database and force a truncating checkpoint.
d, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
return 0, err
}
defer d.Close()
if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
return err
return 0, err
} else if err := d.Close(); err != nil {
return err
return 0, err
}
return nil
return chksum, nil
}
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.

View File

@@ -988,8 +988,6 @@ func compressFile(src, dst string, uid, gid int) error {
func ValidateReplica(ctx context.Context, r Replica) error {
db := r.DB()
log.Printf("%s(%s): computing primary checksum", db.Path(), r.Name())
// Compute checksum of primary database under lock. This prevents a
// sync from occurring and the database will not be written.
chksum0, pos, err := db.CRC64()
@@ -1003,7 +1001,6 @@ func ValidateReplica(ctx context.Context, r Replica) error {
if err := waitForReplica(ctx, r, pos); err != nil {
return fmt.Errorf("cannot wait for replica: %w", err)
}
log.Printf("%s(%s): replica ready, restoring", db.Path(), r.Name())
// Restore replica to a temporary directory.
tmpdir, err := ioutil.TempDir("", "*-litestream")
@@ -1023,8 +1020,6 @@ func ValidateReplica(ctx context.Context, r Replica) error {
return fmt.Errorf("cannot restore: %w", err)
}
log.Printf("%s(%s): restore complete, computing checksum", db.Path(), r.Name())
// Open file handle for restored database.
f, err := os.Open(db.Path())
if err != nil {
@@ -1039,7 +1034,7 @@ func ValidateReplica(ctx context.Context, r Replica) error {
}
chksum1 := h.Sum64()
log.Printf("%s(%s): replica checksum computed: %016x", db.Path(), r.Name(), chksum1)
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1)
// Validate checksums match.
if chksum0 != chksum1 {