Add 'validate' command
This commit is contained in:
199
db.go
199
db.go
@@ -8,9 +8,11 @@ import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -414,7 +416,7 @@ func (db *DB) cleanWAL() error {
|
||||
// Determine lowest index that's been replicated to all replicas.
|
||||
min := -1
|
||||
for _, r := range db.Replicas {
|
||||
pos := r.Pos()
|
||||
pos := r.LastPos()
|
||||
if pos.Generation != generation {
|
||||
pos = Pos{} // different generation, reset index to zero
|
||||
}
|
||||
@@ -1165,6 +1167,13 @@ func (db *DB) monitor() {
|
||||
// a timestamp can be specified to restore the database to a specific
|
||||
// point-in-time.
|
||||
func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
|
||||
// Validate options.
|
||||
if opt.Generation == "" && opt.Index != math.MaxInt64 {
|
||||
return fmt.Errorf("must specify generation when restoring to index")
|
||||
} else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() {
|
||||
return fmt.Errorf("cannot specify index & timestamp to restore")
|
||||
}
|
||||
|
||||
// Ensure logger exists.
|
||||
logger := opt.Logger
|
||||
if logger == nil {
|
||||
@@ -1199,7 +1208,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
|
||||
}
|
||||
|
||||
// Find the maximum WAL index that occurs before timestamp.
|
||||
maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Timestamp)
|
||||
maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Index, opt.Timestamp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot find max wal index for restore: %w", err)
|
||||
}
|
||||
@@ -1326,6 +1335,13 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string,
|
||||
|
||||
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
|
||||
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
|
||||
// Open WAL file from replica.
|
||||
rd, err := r.WALReader(ctx, generation, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rd.Close()
|
||||
|
||||
// Open handle to destination WAL path.
|
||||
f, err := os.Create(dbPath + "-wal")
|
||||
if err != nil {
|
||||
@@ -1333,13 +1349,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
rd, err := r.WALReader(ctx, generation, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rd.Close()
|
||||
|
||||
//
|
||||
// Copy WAL to target path.
|
||||
if _, err := io.Copy(f, rd); err != nil {
|
||||
return err
|
||||
} else if err := f.Close(); err != nil {
|
||||
@@ -1362,6 +1372,167 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate restores the most recent data from a replica and validates
|
||||
// that the resulting database matches the current database.
|
||||
func (db *DB) Validate(ctx context.Context, replicaName string, opt RestoreOptions) error {
|
||||
if replicaName == "" {
|
||||
return fmt.Errorf("replica name required")
|
||||
}
|
||||
|
||||
// Look up replica by name.
|
||||
r := db.Replica(replicaName)
|
||||
if r == nil {
|
||||
return fmt.Errorf("replica not found: %q", replicaName)
|
||||
}
|
||||
|
||||
// Ensure logger exists.
|
||||
logger := opt.Logger
|
||||
if logger == nil {
|
||||
logger = log.New(ioutil.Discard, "", 0)
|
||||
}
|
||||
|
||||
logger.Printf("computing primary checksum")
|
||||
|
||||
// Compute checksum of primary database under read lock. This prevents a
|
||||
// sync from occurring and the database will not be written.
|
||||
chksum0, pos, err := db.CRC32C()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||
}
|
||||
logger.Printf("primary checksum computed: %08x", chksum0)
|
||||
|
||||
// Wait until replica catches up to position.
|
||||
logger.Printf("waiting for replica")
|
||||
if err := db.waitForReplica(ctx, r, pos, logger); err != nil {
|
||||
return fmt.Errorf("cannot wait for replica: %w", err)
|
||||
}
|
||||
logger.Printf("replica ready, restoring")
|
||||
|
||||
// Restore replica to a temporary directory.
|
||||
tmpdir, err := ioutil.TempDir("", "*-litestream")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
restorePath := filepath.Join(tmpdir, "db")
|
||||
if err := db.Restore(ctx, RestoreOptions{
|
||||
OutputPath: restorePath,
|
||||
ReplicaName: replicaName,
|
||||
Generation: pos.Generation,
|
||||
Index: pos.Index - 1,
|
||||
DryRun: opt.DryRun,
|
||||
Logger: opt.Logger,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("cannot restore: %w", err)
|
||||
}
|
||||
|
||||
// Skip remaining validation if this is just a dry run.
|
||||
if opt.DryRun {
|
||||
return fmt.Errorf("validation stopped, dry run only")
|
||||
}
|
||||
|
||||
logger.Printf("restore complete, computing checksum")
|
||||
|
||||
// Open file handle for restored database.
|
||||
f, err := os.Open(db.Path())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Compute checksum.
|
||||
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
if _, err := io.Copy(h, f); err != nil {
|
||||
return err
|
||||
}
|
||||
chksum1 := h.Sum32()
|
||||
|
||||
logger.Printf("replica checksum computed: %08x", chksum1)
|
||||
|
||||
// Validate checksums match.
|
||||
if chksum0 != chksum1 {
|
||||
return ErrChecksumMismatch
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForReplica blocks until replica reaches at least the given position.
|
||||
func (db *DB) waitForReplica(ctx context.Context, r Replica, pos Pos, logger *log.Logger) error {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
once := make(chan struct{}, 1)
|
||||
once <- struct{}{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
case <-once: // immediate on first check
|
||||
}
|
||||
|
||||
// Obtain current position of replica, check if past target position.
|
||||
curr, err := r.CalcPos(pos.Generation)
|
||||
if err != nil {
|
||||
logger.Printf("cannot obtain replica position: %w", err)
|
||||
continue
|
||||
}
|
||||
|
||||
ready := true
|
||||
if curr.Generation != pos.Generation {
|
||||
ready = false
|
||||
} else if curr.Index < pos.Index {
|
||||
ready = false
|
||||
} else if curr.Index == pos.Index && curr.Offset < pos.Offset {
|
||||
ready = false
|
||||
}
|
||||
|
||||
// If not ready, restart loop.
|
||||
if !ready {
|
||||
logger.Printf("replica at %s, waiting for %s", curr, pos)
|
||||
continue
|
||||
}
|
||||
|
||||
// Current position at or after target position.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// CRC32C returns a CRC-32C checksum of the database and its current position.
|
||||
//
|
||||
// This function obtains a read lock so it prevents syncs from occuring until
|
||||
// the operation is complete. The database will still be usable but it will be
|
||||
// unable to checkpoint during this time.
|
||||
func (db *DB) CRC32C() (uint32, Pos, error) {
|
||||
db.mu.RLock()
|
||||
defer db.mu.RUnlock()
|
||||
|
||||
// Obtain current position. Clear the offset since we are only reading the
|
||||
// DB and not applying the current WAL.
|
||||
pos, err := db.Pos()
|
||||
if err != nil {
|
||||
return 0, pos, err
|
||||
}
|
||||
pos.Offset = 0
|
||||
|
||||
// Open file handle for database.
|
||||
f, err := os.Open(db.Path())
|
||||
if err != nil {
|
||||
return 0, pos, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Compute checksum.
|
||||
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
if _, err := io.Copy(h, f); err != nil {
|
||||
return 0, pos, err
|
||||
}
|
||||
return h.Sum32(), pos, nil
|
||||
}
|
||||
|
||||
// RestoreOptions represents options for DB.Restore().
|
||||
type RestoreOptions struct {
|
||||
// Target path to restore into.
|
||||
@@ -1376,6 +1547,10 @@ type RestoreOptions struct {
|
||||
// If blank, all generations considered.
|
||||
Generation string
|
||||
|
||||
// Specific index to restore from.
|
||||
// Set to math.MaxInt64 to ignore index.
|
||||
Index int
|
||||
|
||||
// Point-in-time to restore database.
|
||||
// If zero, database restore to most recent state available.
|
||||
Timestamp time.Time
|
||||
@@ -1388,6 +1563,12 @@ type RestoreOptions struct {
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
func NewRestoreOptions() RestoreOptions {
|
||||
return RestoreOptions{
|
||||
Index: math.MaxInt64,
|
||||
}
|
||||
}
|
||||
|
||||
func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
|
||||
magic := binary.BigEndian.Uint32(hdr[0:])
|
||||
switch magic {
|
||||
|
||||
Reference in New Issue
Block a user