Remove debugging code
This commit is contained in:
81
db.go
81
db.go
@@ -1327,36 +1327,24 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
|
|||||||
tmpPath := outputPath + ".tmp"
|
tmpPath := outputPath + ".tmp"
|
||||||
|
|
||||||
// Copy snapshot to output path.
|
// Copy snapshot to output path.
|
||||||
var dbChksum, dbChksumAlt uint64
|
|
||||||
if !opt.DryRun {
|
if !opt.DryRun {
|
||||||
if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
|
if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
|
||||||
return fmt.Errorf("cannot restore snapshot: %w", err)
|
return fmt.Errorf("cannot restore snapshot: %w", err)
|
||||||
}
|
}
|
||||||
if dbChksum, err = checksumFile(tmpPath); err != nil {
|
|
||||||
return fmt.Errorf("cannot compute db checksum: %w", err)
|
|
||||||
}
|
|
||||||
if dbChksumAlt, err = checksumFileAt(tmpPath, int64(db.pageSize)); err != nil {
|
|
||||||
return fmt.Errorf("cannot compute alternate db checksum: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Printf("%s(%s): restoring snapshot %s/%08x to %s, checksum=%016x alt=%016x", db.path, r.Name(), generation, minWALIndex, tmpPath, dbChksum, dbChksumAlt)
|
log.Printf("%s(%s): restoring snapshot %s/%08x to %s", db.path, r.Name(), generation, minWALIndex, tmpPath)
|
||||||
|
|
||||||
// Restore each WAL file until we reach our maximum index.
|
// Restore each WAL file until we reach our maximum index.
|
||||||
var walChksum uint64
|
|
||||||
for index := minWALIndex; index <= maxWALIndex; index++ {
|
for index := minWALIndex; index <= maxWALIndex; index++ {
|
||||||
if !opt.DryRun {
|
if !opt.DryRun {
|
||||||
if walChksum, err = db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
|
if err = db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
|
||||||
log.Printf("%s(%s): no wal available, snapshot only", db.path, r.Name())
|
log.Printf("%s(%s): no wal available, snapshot only", db.path, r.Name())
|
||||||
break // snapshot file only, ignore error
|
break // snapshot file only, ignore error
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return fmt.Errorf("cannot restore wal: %w", err)
|
return fmt.Errorf("cannot restore wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbChksum, err = checksumFile(tmpPath); err != nil {
|
|
||||||
return fmt.Errorf("cannot compute db checksum after wal restore: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Printf("%s(%s): restored wal %s/%08x, checksum=%016x (db=%016x)", db.path, r.Name(), generation, index, walChksum, dbChksum)
|
log.Printf("%s(%s): restored wal %s/%08x", db.path, r.Name(), generation, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy file to final location.
|
// Copy file to final location.
|
||||||
@@ -1488,47 +1476,42 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
|
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
|
||||||
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) (uint64, error) {
|
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
|
||||||
// Open WAL file from replica.
|
// Open WAL file from replica.
|
||||||
rd, err := r.WALReader(ctx, generation, index)
|
rd, err := r.WALReader(ctx, generation, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
defer rd.Close()
|
defer rd.Close()
|
||||||
|
|
||||||
// Open handle to destination WAL path.
|
// Open handle to destination WAL path.
|
||||||
f, err := createFile(dbPath+"-wal", db.uid, db.gid)
|
f, err := createFile(dbPath+"-wal", db.uid, db.gid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
// Copy WAL to target path.
|
// Copy WAL to target path.
|
||||||
if _, err := io.Copy(f, rd); err != nil {
|
if _, err := io.Copy(f, rd); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
} else if err := f.Close(); err != nil {
|
} else if err := f.Close(); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
|
||||||
|
|
||||||
chksum, err := checksumFile(dbPath + "-wal")
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("cannot checksum wal: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open SQLite database and force a truncating checkpoint.
|
// Open SQLite database and force a truncating checkpoint.
|
||||||
d, err := sql.Open("sqlite3", dbPath)
|
d, err := sql.Open("sqlite3", dbPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
defer d.Close()
|
defer d.Close()
|
||||||
|
|
||||||
if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
|
if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
} else if err := d.Close(); err != nil {
|
} else if err := d.Close(); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return chksum, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
|
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
|
||||||
@@ -1536,61 +1519,41 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
|
|||||||
// This function obtains a read lock so it prevents syncs from occuring until
|
// This function obtains a read lock so it prevents syncs from occuring until
|
||||||
// the operation is complete. The database will still be usable but it will be
|
// the operation is complete. The database will still be usable but it will be
|
||||||
// unable to checkpoint during this time.
|
// unable to checkpoint during this time.
|
||||||
func (db *DB) CRC64() (uint64, uint64, Pos, error) {
|
func (db *DB) CRC64() (uint64, Pos, error) {
|
||||||
db.mu.Lock()
|
db.mu.Lock()
|
||||||
defer db.mu.Unlock()
|
defer db.mu.Unlock()
|
||||||
|
|
||||||
if err := db.init(); err != nil {
|
if err := db.init(); err != nil {
|
||||||
return 0, 0, Pos{}, err
|
return 0, Pos{}, err
|
||||||
} else if db.db == nil {
|
} else if db.db == nil {
|
||||||
return 0, 0, Pos{}, os.ErrNotExist
|
return 0, Pos{}, os.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
generation, err := db.CurrentGeneration()
|
generation, err := db.CurrentGeneration()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, Pos{}, fmt.Errorf("cannot find current generation: %w", err)
|
return 0, Pos{}, fmt.Errorf("cannot find current generation: %w", err)
|
||||||
} else if generation == "" {
|
} else if generation == "" {
|
||||||
return 0, 0, Pos{}, fmt.Errorf("no current generation")
|
return 0, Pos{}, fmt.Errorf("no current generation")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Force a RESTART checkpoint to ensure the database is at the start of the WAL.
|
// Force a RESTART checkpoint to ensure the database is at the start of the WAL.
|
||||||
if err := db.checkpointAndInit(generation, CheckpointModeRestart); err != nil {
|
if err := db.checkpointAndInit(generation, CheckpointModeRestart); err != nil {
|
||||||
return 0, 0, Pos{}, err
|
return 0, Pos{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Obtain current position. Clear the offset since we are only reading the
|
// Obtain current position. Clear the offset since we are only reading the
|
||||||
// DB and not applying the current WAL.
|
// DB and not applying the current WAL.
|
||||||
pos, err := db.Pos()
|
pos, err := db.Pos()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, pos, err
|
return 0, pos, err
|
||||||
}
|
}
|
||||||
pos.Offset = 0
|
pos.Offset = 0
|
||||||
|
|
||||||
var tmpfile string
|
chksum, err := checksumFile(db.Path())
|
||||||
if src, err := os.Open(db.Path()); err != nil {
|
|
||||||
return 0, 0, pos, err
|
|
||||||
} else if dst, err := ioutil.TempFile("", "*-litestream.db"); err != nil {
|
|
||||||
return 0, 0, pos, err
|
|
||||||
} else if _, err := io.Copy(dst, src); err != nil {
|
|
||||||
return 0, 0, pos, err
|
|
||||||
} else if err := src.Close(); err != nil {
|
|
||||||
return 0, 0, pos, err
|
|
||||||
} else if err := dst.Close(); err != nil {
|
|
||||||
return 0, 0, pos, err
|
|
||||||
} else {
|
|
||||||
tmpfile = dst.Name()
|
|
||||||
}
|
|
||||||
log.Printf("db.crc64: copied database to temporary location: %s", tmpfile)
|
|
||||||
|
|
||||||
chksum, err := checksumFile(tmpfile)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, pos, err
|
return 0, pos, err
|
||||||
}
|
}
|
||||||
chksumAlt, err := checksumFileAt(tmpfile, int64(db.pageSize))
|
return chksum, pos, nil
|
||||||
if err != nil {
|
|
||||||
return 0, 0, pos, err
|
|
||||||
}
|
|
||||||
return chksum, chksumAlt, pos, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestoreOptions represents options for DB.Restore().
|
// RestoreOptions represents options for DB.Restore().
|
||||||
|
|||||||
@@ -127,6 +127,10 @@ func TestDB_CRC64(t *testing.T) {
|
|||||||
db, sqldb := MustOpenDBs(t)
|
db, sqldb := MustOpenDBs(t)
|
||||||
defer MustCloseDBs(t, db, sqldb)
|
defer MustCloseDBs(t, db, sqldb)
|
||||||
|
|
||||||
|
if err := db.Sync(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
chksum0, _, err := db.CRC64()
|
chksum0, _, err := db.CRC64()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|||||||
10
replica.go
10
replica.go
@@ -989,11 +989,11 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
|||||||
|
|
||||||
// Compute checksum of primary database under lock. This prevents a
|
// Compute checksum of primary database under lock. This prevents a
|
||||||
// sync from occurring and the database will not be written.
|
// sync from occurring and the database will not be written.
|
||||||
chksum0, chksum0alt, pos, err := db.CRC64()
|
chksum0, pos, err := db.CRC64()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||||
}
|
}
|
||||||
log.Printf("%s(%s): primary checksum computed: %016x @ %s (alt=%016x)", db.Path(), r.Name(), chksum0, pos, chksum0alt)
|
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos)
|
||||||
|
|
||||||
// Wait until replica catches up to position.
|
// Wait until replica catches up to position.
|
||||||
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
|
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
|
||||||
@@ -1024,12 +1024,8 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
chksum1alt, err := checksumFileAt(restorePath, int64(db.pageSize))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("%s(%s): restore complete, replica checksum=%016x (alt=%016x)", db.Path(), r.Name(), chksum1, chksum1alt)
|
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1)
|
||||||
|
|
||||||
// Validate checksums match.
|
// Validate checksums match.
|
||||||
if chksum0 != chksum1 {
|
if chksum0 != chksum1 {
|
||||||
|
|||||||
Reference in New Issue
Block a user