From 3b9275488daa175d0ebc07ef99a5c25b044b9aff Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 31 Dec 2020 10:36:48 -0700 Subject: [PATCH] Add 'validate' command --- cmd/litestream/main.go | 3 + cmd/litestream/restore.go | 17 ++-- cmd/litestream/validate.go | 136 +++++++++++++++++++++++++ db.go | 199 +++++++++++++++++++++++++++++++++++-- litestream.go | 3 +- replicator.go | 33 ++++-- 6 files changed, 364 insertions(+), 27 deletions(-) create mode 100644 cmd/litestream/validate.go diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index ddfc1d5..791798a 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -58,6 +58,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { return (&RestoreCommand{}).Run(ctx, args) case "snapshots": return (&SnapshotsCommand{}).Run(ctx, args) + case "validate": + return (&ValidateCommand{}).Run(ctx, args) case "version": return (&VersionCommand{}).Run(ctx, args) case "wal": @@ -85,6 +87,7 @@ The commands are: replicate runs a server to replicate databases restore recovers database backup from a replica snapshots list available snapshots for a database + validate checks replica to ensure a consistent state with primary version prints the version wal list available WAL files for a database `[1:]) diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index ebd01e4..1387e93 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -14,17 +14,17 @@ import ( ) type RestoreCommand struct { - DBPath string } func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { var configPath string - var opt litestream.RestoreOptions + opt := litestream.NewRestoreOptions() fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) registerConfigFlag(fs, &configPath) fs.StringVar(&opt.OutputPath, "o", "", "output path") fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") fs.StringVar(&opt.Generation, "generation", "", "generation name") + fs.IntVar(&opt.Index, "index", opt.Index, "wal index") fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run") timestampStr := fs.String("timestamp", "", "timestamp") verbose := fs.Bool("v", false, "verbose output") @@ -63,15 +63,16 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { opt.Logger = log.New(os.Stderr, "", log.LstdFlags) } - // Determine absolute path for database, if specified. - if c.DBPath, err = filepath.Abs(fs.Arg(0)); err != nil { + // Determine absolute path for database. + dbPath, err := filepath.Abs(fs.Arg(0)) + if err != nil { return err } // Instantiate DB. - dbConfig := config.DBConfig(c.DBPath) + dbConfig := config.DBConfig(dbPath) if dbConfig == nil { - return fmt.Errorf("database not found in config: %s", c.DBPath) + return fmt.Errorf("database not found in config: %s", dbPath) } db, err := newDBFromConfig(dbConfig) if err != nil { @@ -103,6 +104,10 @@ Arguments: Restore from a specific generation. Defaults to generation with latest data. + -index NUM + Restore up to a specific WAL index (inclusive). + Defaults to use the highest available index. + -timestamp TIMESTAMP Restore to a specific point-in-time. Defaults to use the latest available backup. diff --git a/cmd/litestream/validate.go b/cmd/litestream/validate.go new file mode 100644 index 0000000..9dde971 --- /dev/null +++ b/cmd/litestream/validate.go @@ -0,0 +1,136 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/benbjohnson/litestream" +) + +type ValidateCommand struct{} + +func (c *ValidateCommand) Run(ctx context.Context, args []string) (err error) { + var configPath string + opt := litestream.NewRestoreOptions() + fs := flag.NewFlagSet("litestream-validate", flag.ContinueOnError) + registerConfigFlag(fs, &configPath) + fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") + fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run") + verbose := fs.Bool("v", false, "verbose output") + fs.Usage = c.Usage + if err := fs.Parse(args); err != nil { + return err + } else if fs.NArg() == 0 || fs.Arg(0) == "" { + return fmt.Errorf("database path required") + } else if fs.NArg() > 1 { + return fmt.Errorf("too many arguments") + } + + // Load configuration. + if configPath == "" { + return errors.New("-config required") + } + config, err := ReadConfigFile(configPath) + if err != nil { + return err + } + + // Verbose output is automatically enabled if dry run is specified. + if opt.DryRun { + *verbose = true + } + + // Instantiate logger if verbose output is enabled. + if *verbose { + opt.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + + // Determine absolute path for database. + dbPath, err := filepath.Abs(fs.Arg(0)) + if err != nil { + return err + } + + // Instantiate DB. + dbConfig := config.DBConfig(dbPath) + if dbConfig == nil { + return fmt.Errorf("database not found in config: %s", dbPath) + } + db, err := newDBFromConfig(dbConfig) + if err != nil { + return err + } + + // Ensure replica exists, if specified. + if opt.ReplicaName != "" && db.Replica(opt.ReplicaName) == nil { + return fmt.Errorf("replica not found: %s", opt.ReplicaName) + } + + // Validate all matching replicas. + var hasInvalidReplica bool + for _, r := range db.Replicas { + if opt.ReplicaName != "" && opt.ReplicaName != r.Name() { + continue + } + + if err := db.Validate(ctx, r.Name(), opt); err != nil { + fmt.Printf("%s: replica invalid: %s\n", r.Name(), err) + } + } + + if hasInvalidReplica { + return fmt.Errorf("one or more invalid replicas found") + } + + fmt.Println("ok") + return nil +} + +func (c *ValidateCommand) Usage() { + fmt.Printf(` +The validate command compares a checksum of the primary database with a +checksum of the replica at the same point in time. Returns an error if the +databases are not equal. + +The restored database must be written to a temporary file so you must ensure +you have enough disk space before performing this operation. + +Usage: + + litestream validate [arguments] DB + +Arguments: + + -config PATH + Specifies the configuration file. + Defaults to %s + + -replica NAME + Validate a specific replica. + Defaults to validating all replicas. + + -dry-run + Prints all log output as if it were running but does + not perform actual validation. + + -v + Verbose output. + + +Examples: + + # Validate all replicas for the given database. + $ litestream validate /path/to/db + + # Validate only the S3 replica. + $ litestream restore -replica s3 /path/to/db + +`[1:], + DefaultConfigPath, + ) +} diff --git a/db.go b/db.go index 670eab2..384c27d 100644 --- a/db.go +++ b/db.go @@ -8,9 +8,11 @@ import ( "encoding/hex" "errors" "fmt" + "hash/crc32" "io" "io/ioutil" "log" + "math" "math/rand" "os" "path/filepath" @@ -414,7 +416,7 @@ func (db *DB) cleanWAL() error { // Determine lowest index that's been replicated to all replicas. min := -1 for _, r := range db.Replicas { - pos := r.Pos() + pos := r.LastPos() if pos.Generation != generation { pos = Pos{} // different generation, reset index to zero } @@ -1165,6 +1167,13 @@ func (db *DB) monitor() { // a timestamp can be specified to restore the database to a specific // point-in-time. func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { + // Validate options. + if opt.Generation == "" && opt.Index != math.MaxInt64 { + return fmt.Errorf("must specify generation when restoring to index") + } else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() { + return fmt.Errorf("cannot specify index & timestamp to restore") + } + // Ensure logger exists. logger := opt.Logger if logger == nil { @@ -1199,7 +1208,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { } // Find the maximum WAL index that occurs before timestamp. - maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Timestamp) + maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Index, opt.Timestamp) if err != nil { return fmt.Errorf("cannot find max wal index for restore: %w", err) } @@ -1326,6 +1335,13 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string, // restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { + // Open WAL file from replica. + rd, err := r.WALReader(ctx, generation, index) + if err != nil { + return err + } + defer rd.Close() + // Open handle to destination WAL path. f, err := os.Create(dbPath + "-wal") if err != nil { @@ -1333,13 +1349,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde } defer f.Close() - rd, err := r.WALReader(ctx, generation, index) - if err != nil { - return err - } - defer rd.Close() - - // + // Copy WAL to target path. if _, err := io.Copy(f, rd); err != nil { return err } else if err := f.Close(); err != nil { @@ -1362,6 +1372,167 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde return nil } +// Validate restores the most recent data from a replica and validates +// that the resulting database matches the current database. +func (db *DB) Validate(ctx context.Context, replicaName string, opt RestoreOptions) error { + if replicaName == "" { + return fmt.Errorf("replica name required") + } + + // Look up replica by name. + r := db.Replica(replicaName) + if r == nil { + return fmt.Errorf("replica not found: %q", replicaName) + } + + // Ensure logger exists. + logger := opt.Logger + if logger == nil { + logger = log.New(ioutil.Discard, "", 0) + } + + logger.Printf("computing primary checksum") + + // Compute checksum of primary database under read lock. This prevents a + // sync from occurring and the database will not be written. + chksum0, pos, err := db.CRC32C() + if err != nil { + return fmt.Errorf("cannot compute checksum: %w", err) + } + logger.Printf("primary checksum computed: %08x", chksum0) + + // Wait until replica catches up to position. + logger.Printf("waiting for replica") + if err := db.waitForReplica(ctx, r, pos, logger); err != nil { + return fmt.Errorf("cannot wait for replica: %w", err) + } + logger.Printf("replica ready, restoring") + + // Restore replica to a temporary directory. + tmpdir, err := ioutil.TempDir("", "*-litestream") + if err != nil { + return err + } + defer os.RemoveAll(tmpdir) + + restorePath := filepath.Join(tmpdir, "db") + if err := db.Restore(ctx, RestoreOptions{ + OutputPath: restorePath, + ReplicaName: replicaName, + Generation: pos.Generation, + Index: pos.Index - 1, + DryRun: opt.DryRun, + Logger: opt.Logger, + }); err != nil { + return fmt.Errorf("cannot restore: %w", err) + } + + // Skip remaining validation if this is just a dry run. + if opt.DryRun { + return fmt.Errorf("validation stopped, dry run only") + } + + logger.Printf("restore complete, computing checksum") + + // Open file handle for restored database. + f, err := os.Open(db.Path()) + if err != nil { + return err + } + defer f.Close() + + // Compute checksum. + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + if _, err := io.Copy(h, f); err != nil { + return err + } + chksum1 := h.Sum32() + + logger.Printf("replica checksum computed: %08x", chksum1) + + // Validate checksums match. + if chksum0 != chksum1 { + return ErrChecksumMismatch + } + + return nil +} + +// waitForReplica blocks until replica reaches at least the given position. +func (db *DB) waitForReplica(ctx context.Context, r Replica, pos Pos, logger *log.Logger) error { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + once := make(chan struct{}, 1) + once <- struct{}{} + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + case <-once: // immediate on first check + } + + // Obtain current position of replica, check if past target position. + curr, err := r.CalcPos(pos.Generation) + if err != nil { + logger.Printf("cannot obtain replica position: %w", err) + continue + } + + ready := true + if curr.Generation != pos.Generation { + ready = false + } else if curr.Index < pos.Index { + ready = false + } else if curr.Index == pos.Index && curr.Offset < pos.Offset { + ready = false + } + + // If not ready, restart loop. + if !ready { + logger.Printf("replica at %s, waiting for %s", curr, pos) + continue + } + + // Current position at or after target position. + return nil + } +} + +// CRC32C returns a CRC-32C checksum of the database and its current position. +// +// This function obtains a read lock so it prevents syncs from occuring until +// the operation is complete. The database will still be usable but it will be +// unable to checkpoint during this time. +func (db *DB) CRC32C() (uint32, Pos, error) { + db.mu.RLock() + defer db.mu.RUnlock() + + // Obtain current position. Clear the offset since we are only reading the + // DB and not applying the current WAL. + pos, err := db.Pos() + if err != nil { + return 0, pos, err + } + pos.Offset = 0 + + // Open file handle for database. + f, err := os.Open(db.Path()) + if err != nil { + return 0, pos, err + } + defer f.Close() + + // Compute checksum. + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + if _, err := io.Copy(h, f); err != nil { + return 0, pos, err + } + return h.Sum32(), pos, nil +} + // RestoreOptions represents options for DB.Restore(). type RestoreOptions struct { // Target path to restore into. @@ -1376,6 +1547,10 @@ type RestoreOptions struct { // If blank, all generations considered. Generation string + // Specific index to restore from. + // Set to math.MaxInt64 to ignore index. + Index int + // Point-in-time to restore database. // If zero, database restore to most recent state available. Timestamp time.Time @@ -1388,6 +1563,12 @@ type RestoreOptions struct { Logger *log.Logger } +func NewRestoreOptions() RestoreOptions { + return RestoreOptions{ + Index: math.MaxInt64, + } +} + func headerByteOrder(hdr []byte) (binary.ByteOrder, error) { magic := binary.BigEndian.Uint32(hdr[0:]) switch magic { diff --git a/litestream.go b/litestream.go index 577ff5d..8b4d8a4 100644 --- a/litestream.go +++ b/litestream.go @@ -38,7 +38,8 @@ const ( // Litestream errors. var ( - ErrNoSnapshots = errors.New("no snapshots available") + ErrNoSnapshots = errors.New("no snapshots available") + ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") ) // SnapshotInfo represents file information about a snapshot. diff --git a/replicator.go b/replicator.go index 96d7c77..58414a3 100644 --- a/replicator.go +++ b/replicator.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "log" + "math" "os" "path/filepath" "sort" @@ -30,7 +31,10 @@ type Replica interface { Stop() // Returns the last replication position. - Pos() Pos + LastPos() Pos + + // Returns the computed position of the replica for a given generation. + CalcPos(generation string) (Pos, error) // Returns a list of generation names for the replica. Generations(ctx context.Context) ([]string, error) @@ -51,7 +55,7 @@ type Replica interface { // Returns the highest index for a WAL file that occurs before timestamp. // If timestamp is zero, returns the highest WAL index. - WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) + WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) // Returns a reader for snapshot data at the given generation/index. SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) @@ -110,8 +114,8 @@ func (r *FileReplica) Type() string { return "file" } -// Pos returns the last successfully replicated position. -func (r *FileReplica) Pos() Pos { +// LastPos returns the last successfully replicated position. +func (r *FileReplica) LastPos() Pos { r.mu.RLock() defer r.mu.RUnlock() return r.pos @@ -449,9 +453,9 @@ func (r *FileReplica) monitor(ctx context.Context) { } } -// calcPos returns the position for the replica for the current generation. +// CalcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. -func (r *FileReplica) calcPos(generation string) (pos Pos, err error) { +func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) { pos.Generation = generation // Find maximum snapshot index. @@ -561,8 +565,8 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { } // Determine position, if necessary. - if r.Pos().IsZero() { - pos, err := r.calcPos(generation) + if r.LastPos().IsZero() { + pos, err := r.CalcPos(generation) if err != nil { return fmt.Errorf("cannot determine replica position: %s", err) } @@ -592,7 +596,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { } func (r *FileReplica) syncWAL(ctx context.Context) (err error) { - rd, err := r.db.ShadowWALReader(r.Pos()) + rd, err := r.db.ShadowWALReader(r.LastPos()) if err == io.EOF { return err } else if err != nil { @@ -697,9 +701,9 @@ func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, ti return index, nil } -// Returns the highest index for a WAL file that occurs before timestamp. +// Returns the highest index for a WAL file that occurs before maxIndex & timestamp. // If timestamp is zero, returns the highest WAL index. -func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { +func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) { names, err := r.WALSubdirNames(generation) if err != nil { return 0, err @@ -724,6 +728,8 @@ func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, timesta continue // not a snapshot, skip } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { continue // after timestamp, skip + } else if idx > maxIndex { + continue // after timestamp, skip } else if idx < index { continue // earlier index, skip } @@ -732,6 +738,11 @@ func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, timesta } } + // If max index is specified but not found, return an error. + if maxIndex != math.MaxInt64 && index != maxIndex { + return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index) + } + return index, nil }