From 1b8cfc8a418f4a7d049b71b453120a309134e8db Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 15 Jan 2021 16:04:25 -0700 Subject: [PATCH] Add validation interval --- cmd/litestream/main.go | 9 ++- cmd/litestream/validate.go | 136 --------------------------------- db.go | 138 ++------------------------------- internal/metrics.go | 7 ++ replica.go | 152 ++++++++++++++++++++++++++++++++++++- s3/s3.go | 33 +++++++- 6 files changed, 204 insertions(+), 271 deletions(-) delete mode 100644 cmd/litestream/validate.go diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 73a88eb..5213f71 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -59,8 +59,6 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { return (&RestoreCommand{}).Run(ctx, args) case "snapshots": return (&SnapshotsCommand{}).Run(ctx, args) - case "validate": - return (&ValidateCommand{}).Run(ctx, args) case "version": return (&VersionCommand{}).Run(ctx, args) case "wal": @@ -190,6 +188,7 @@ type ReplicaConfig struct { Retention time.Duration `yaml:"retention"` RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` SyncInterval time.Duration `yaml:"sync-interval"` // s3 only + ValidationInterval time.Duration `yaml:"validation-interval"` // S3 settings AccessKeyID string `yaml:"access-key-id"` @@ -292,6 +291,9 @@ func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *R if v := rc.RetentionCheckInterval; v > 0 { r.RetentionCheckInterval = v } + if v := rc.ValidationInterval; v > 0 { + r.ValidationInterval = v + } return r, nil } @@ -341,5 +343,8 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep if v := rc.SyncInterval; v > 0 { r.SyncInterval = v } + if v := rc.ValidationInterval; v > 0 { + r.ValidationInterval = v + } return r, nil } diff --git a/cmd/litestream/validate.go b/cmd/litestream/validate.go deleted file mode 100644 index 218f692..0000000 --- a/cmd/litestream/validate.go +++ /dev/null @@ -1,136 +0,0 @@ -package main - -import ( - "context" - "errors" - "flag" - "fmt" - "log" - "os" - "path/filepath" - - "github.com/benbjohnson/litestream" -) - -type ValidateCommand struct{} - -func (c *ValidateCommand) Run(ctx context.Context, args []string) (err error) { - var configPath string - opt := litestream.NewRestoreOptions() - fs := flag.NewFlagSet("litestream-validate", flag.ContinueOnError) - registerConfigFlag(fs, &configPath) - fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") - fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run") - verbose := fs.Bool("v", false, "verbose output") - fs.Usage = c.Usage - if err := fs.Parse(args); err != nil { - return err - } else if fs.NArg() == 0 || fs.Arg(0) == "" { - return fmt.Errorf("database path required") - } else if fs.NArg() > 1 { - return fmt.Errorf("too many arguments") - } - - // Load configuration. - if configPath == "" { - return errors.New("-config required") - } - config, err := ReadConfigFile(configPath) - if err != nil { - return err - } - - // Verbose output is automatically enabled if dry run is specified. - if opt.DryRun { - *verbose = true - } - - // Instantiate logger if verbose output is enabled. - if *verbose { - opt.Logger = log.New(os.Stderr, "", log.LstdFlags) - } - - // Determine absolute path for database. - dbPath, err := filepath.Abs(fs.Arg(0)) - if err != nil { - return err - } - - // Instantiate DB. - dbConfig := config.DBConfig(dbPath) - if dbConfig == nil { - return fmt.Errorf("database not found in config: %s", dbPath) - } - db, err := newDBFromConfig(&config, dbConfig) - if err != nil { - return err - } - - // Ensure replica exists, if specified. - if opt.ReplicaName != "" && db.Replica(opt.ReplicaName) == nil { - return fmt.Errorf("replica not found: %s", opt.ReplicaName) - } - - // Validate all matching replicas. - var hasInvalidReplica bool - for _, r := range db.Replicas { - if opt.ReplicaName != "" && opt.ReplicaName != r.Name() { - continue - } - - if err := db.Validate(ctx, r.Name(), opt); err != nil { - fmt.Printf("%s: replica invalid: %s\n", r.Name(), err) - } - } - - if hasInvalidReplica { - return fmt.Errorf("one or more invalid replicas found") - } - - fmt.Println("ok") - return nil -} - -func (c *ValidateCommand) Usage() { - fmt.Printf(` -The validate command compares a checksum of the primary database with a -checksum of the replica at the same point in time. Returns an error if the -databases are not equal. - -The restored database must be written to a temporary file so you must ensure -you have enough disk space before performing this operation. - -Usage: - - litestream validate [arguments] DB - -Arguments: - - -config PATH - Specifies the configuration file. - Defaults to %s - - -replica NAME - Validate a specific replica. - Defaults to validating all replicas. - - -dry-run - Prints all log output as if it were running but does - not perform actual validation. - - -v - Verbose output. - - -Examples: - - # Validate all replicas for the given database. - $ litestream validate /path/to/db - - # Validate only the S3 replica. - $ litestream restore -replica s3 /path/to/db - -`[1:], - DefaultConfigPath(), - ) -} diff --git a/db.go b/db.go index 772859b..8909816 100644 --- a/db.go +++ b/db.go @@ -1470,143 +1470,19 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde return nil } -// Validate restores the most recent data from a replica and validates -// that the resulting database matches the current database. -func (db *DB) Validate(ctx context.Context, replicaName string, opt RestoreOptions) error { - if replicaName == "" { - return fmt.Errorf("replica name required") - } - - // Look up replica by name. - r := db.Replica(replicaName) - if r == nil { - return fmt.Errorf("replica not found: %q", replicaName) - } - - // Ensure logger exists. - logger := opt.Logger - if logger == nil { - logger = log.New(ioutil.Discard, "", 0) - } - - logger.Printf("computing primary checksum") - - // Compute checksum of primary database under read lock. This prevents a - // sync from occurring and the database will not be written. - chksum0, pos, err := db.CRC64() - if err != nil { - return fmt.Errorf("cannot compute checksum: %w", err) - } - logger.Printf("primary checksum computed: %08x", chksum0) - - // Wait until replica catches up to position. - logger.Printf("waiting for replica") - if err := db.waitForReplica(ctx, r, pos, logger); err != nil { - return fmt.Errorf("cannot wait for replica: %w", err) - } - logger.Printf("replica ready, restoring") - - // Restore replica to a temporary directory. - tmpdir, err := ioutil.TempDir("", "*-litestream") - if err != nil { - return err - } - defer os.RemoveAll(tmpdir) - - restorePath := filepath.Join(tmpdir, "db") - if err := db.Restore(ctx, RestoreOptions{ - OutputPath: restorePath, - ReplicaName: replicaName, - Generation: pos.Generation, - Index: pos.Index - 1, - DryRun: opt.DryRun, - Logger: opt.Logger, - }); err != nil { - return fmt.Errorf("cannot restore: %w", err) - } - - // Skip remaining validation if this is just a dry run. - if opt.DryRun { - return fmt.Errorf("validation stopped, dry run only") - } - - logger.Printf("restore complete, computing checksum") - - // Open file handle for restored database. - f, err := os.Open(db.Path()) - if err != nil { - return err - } - defer f.Close() - - // Compute checksum. - h := crc64.New(crc64.MakeTable(crc64.ISO)) - if _, err := io.Copy(h, f); err != nil { - return err - } - chksum1 := h.Sum64() - - logger.Printf("replica checksum computed: %08x", chksum1) - - // Validate checksums match. - if chksum0 != chksum1 { - return ErrChecksumMismatch - } - - return nil -} - -// waitForReplica blocks until replica reaches at least the given position. -func (db *DB) waitForReplica(ctx context.Context, r Replica, pos Pos, logger *log.Logger) error { - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - - once := make(chan struct{}, 1) - once <- struct{}{} - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker.C: - case <-once: // immediate on first check - } - - // Obtain current position of replica, check if past target position. - curr, err := r.CalcPos(ctx, pos.Generation) - if err != nil { - logger.Printf("cannot obtain replica position: %s", err) - continue - } - - ready := true - if curr.Generation != pos.Generation { - ready = false - } else if curr.Index < pos.Index { - ready = false - } else if curr.Index == pos.Index && curr.Offset < pos.Offset { - ready = false - } - - // If not ready, restart loop. - if !ready { - logger.Printf("replica at %s, waiting for %s", curr, pos) - continue - } - - // Current position at or after target position. - return nil - } -} - // CRC64 returns a CRC-64 ISO checksum of the database and its current position. // // This function obtains a read lock so it prevents syncs from occuring until // the operation is complete. The database will still be usable but it will be // unable to checkpoint during this time. func (db *DB) CRC64() (uint64, Pos, error) { - db.mu.RLock() - defer db.mu.RUnlock() + db.mu.Lock() + defer db.mu.Unlock() + + // Force a RESTART checkpoint to ensure the database is at the start of the WAL. + if err := db.checkpoint(CheckpointModeRestart); err != nil { + return 0, Pos{}, err + } // Obtain current position. Clear the offset since we are only reading the // DB and not applying the current WAL. diff --git a/internal/metrics.go b/internal/metrics.go index 2d14e6e..098f392 100644 --- a/internal/metrics.go +++ b/internal/metrics.go @@ -34,4 +34,11 @@ var ( Name: "wal_offset", Help: "The current WAL offset", }, []string{"db", "name"}) + + ReplicaValidationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "litestream", + Subsystem: "replica", + Name: "validation_total", + Help: "The number of validations performed", + }, []string{"db", "name", "status"}) ) diff --git a/replica.go b/replica.go index 6eadf5d..070591a 100644 --- a/replica.go +++ b/replica.go @@ -4,6 +4,7 @@ import ( "compress/gzip" "context" "fmt" + "hash/crc64" "io" "io/ioutil" "log" @@ -26,6 +27,9 @@ type Replica interface { // String identifier for the type of replica ("file", "s3", etc). Type() string + // The parent database. + DB() *DB + // Starts replicating in a background goroutine. Start(ctx context.Context) @@ -102,6 +106,9 @@ type FileReplica struct { // Time between checks for retention. RetentionCheckInterval time.Duration + // Time between validation checks. + ValidationInterval time.Duration + // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool @@ -141,6 +148,11 @@ func (r *FileReplica) Type() string { return "file" } +// DB returns the parent database reference. +func (r *FileReplica) DB() *DB { + return r.db +} + // Path returns the path the replica was initialized with. func (r *FileReplica) Path() string { return r.dst @@ -387,9 +399,10 @@ func (r *FileReplica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutine to replicate data. - r.wg.Add(2) + r.wg.Add(3) go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }() + go func() { defer r.wg.Done(); r.validator(ctx) }() } // Stop cancels any outstanding replication and blocks until finished. @@ -446,6 +459,28 @@ func (r *FileReplica) retainer(ctx context.Context) { } } +// validator runs in a separate goroutine and handles periodic validation. +func (r *FileReplica) validator(ctx context.Context) { + if r.ValidationInterval <= 0 { + return + } + + ticker := time.NewTicker(r.ValidationInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := ValidateReplica(ctx, r); err != nil { + log.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err) + continue + } + } + } +} + // CalcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, err error) { @@ -932,3 +967,118 @@ func compressFile(src, dst string, uid, gid int) error { // Move compressed file to final location. return os.Rename(dst+".tmp", dst) } + +// ValidateReplica restores the most recent data from a replica and validates +// that the resulting database matches the current database. +func ValidateReplica(ctx context.Context, r Replica) error { + db := r.DB() + + log.Printf("%s(%s): computing primary checksum", db.Path(), r.Name()) + + // Compute checksum of primary database under lock. This prevents a + // sync from occurring and the database will not be written. + chksum0, pos, err := db.CRC64() + if err != nil { + return fmt.Errorf("cannot compute checksum: %w", err) + } + log.Printf("%s(%s): primary checksum computed: %08x", db.Path(), r.Name(), chksum0) + + // Wait until replica catches up to position. + log.Printf("%s(%s): waiting for replica", db.Path(), r.Name()) + if err := waitForReplica(ctx, r, pos); err != nil { + return fmt.Errorf("cannot wait for replica: %w", err) + } + log.Printf("%s(%s): replica ready, restoring", db.Path(), r.Name()) + + // Restore replica to a temporary directory. + tmpdir, err := ioutil.TempDir("", "*-litestream") + if err != nil { + return err + } + defer os.RemoveAll(tmpdir) + + restorePath := filepath.Join(tmpdir, "db") + if err := db.Restore(ctx, RestoreOptions{ + OutputPath: restorePath, + ReplicaName: r.Name(), + Generation: pos.Generation, + Index: pos.Index - 1, + Logger: log.New(os.Stderr, "", 0), + }); err != nil { + return fmt.Errorf("cannot restore: %w", err) + } + + log.Printf("%s(%s): restore complete, computing checksum", db.Path(), r.Name()) + + // Open file handle for restored database. + f, err := os.Open(db.Path()) + if err != nil { + return err + } + defer f.Close() + + // Compute checksum. + h := crc64.New(crc64.MakeTable(crc64.ISO)) + if _, err := io.Copy(h, f); err != nil { + return err + } + chksum1 := h.Sum64() + + log.Printf("%s(%s): replica checksum computed: %08x", db.Path(), r.Name(), chksum1) + + // Validate checksums match. + if chksum0 != chksum1 { + internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc() + return ErrChecksumMismatch + } + + internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc() + log.Printf("%s(%s): replica ok", db.Path(), r.Name()) + + return nil +} + +// waitForReplica blocks until replica reaches at least the given position. +func waitForReplica(ctx context.Context, r Replica, pos Pos) error { + db := r.DB() + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + once := make(chan struct{}, 1) + once <- struct{}{} + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + case <-once: // immediate on first check + } + + // Obtain current position of replica, check if past target position. + curr, err := r.CalcPos(ctx, pos.Generation) + if err != nil { + log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err) + continue + } + + ready := true + if curr.Generation != pos.Generation { + ready = false + } else if curr.Index < pos.Index { + ready = false + } else if curr.Index == pos.Index && curr.Offset < pos.Offset { + ready = false + } + + // If not ready, restart loop. + if !ready { + log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos) + continue + } + + // Current position at or after target position. + return nil + } +} diff --git a/s3/s3.go b/s3/s3.go index 5217e54..4d5c846 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -82,6 +82,9 @@ type Replica struct { // Time between retention checks. RetentionCheckInterval time.Duration + // Time between validation checks. + ValidationInterval time.Duration + // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool @@ -128,6 +131,11 @@ func (r *Replica) Type() string { return "s3" } +// DB returns the parent database reference. +func (r *Replica) DB() *litestream.DB { + return r.db +} + // LastPos returns the last successfully replicated position. func (r *Replica) LastPos() litestream.Pos { r.mu.RLock() @@ -410,9 +418,10 @@ func (r *Replica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutines to manage replica data. - r.wg.Add(2) + r.wg.Add(3) go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }() + go func() { defer r.wg.Done(); r.validator(ctx) }() } // Stop cancels any outstanding replication and blocks until finished. @@ -477,6 +486,28 @@ func (r *Replica) retainer(ctx context.Context) { } } +// validator runs in a separate goroutine and handles periodic validation. +func (r *Replica) validator(ctx context.Context) { + if r.ValidationInterval <= 0 { + return + } + + ticker := time.NewTicker(r.ValidationInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := litestream.ValidateReplica(ctx, r); err != nil { + log.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err) + continue + } + } + } +} + // CalcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestream.Pos, err error) {