diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 81068bb..89c62c5 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -52,6 +52,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { return (&GenerationsCommand{}).Run(ctx, args) case "replicate": return (&ReplicateCommand{}).Run(ctx, args) + case "restore": + return (&RestoreCommand{}).Run(ctx, args) case "version": return (&VersionCommand{}).Run(ctx, args) default: @@ -73,8 +75,10 @@ Usage: The commands are: - replicate runs a server to replicate databases - version prints the version + generations list available generations across all dbs & replicas + replicate runs a server to replicate databases + restore recovers database backup from a replica + version prints the version `[1:]) } diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go new file mode 100644 index 0000000..9eee7dc --- /dev/null +++ b/cmd/litestream/restore.go @@ -0,0 +1,143 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "os" + "path/filepath" + "text/tabwriter" + "time" +) + +type RestoreCommand struct { + DBPath string +} + +func NewRestoreCommand() *RestoreCommand { + return &RestoreCommand{} +} + +func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { + var configPath string + var opt litestream.RestoreOptions + fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) + registerConfigFlag(fs, &configPath) + fs.StringVar(&opt.OutputPath, "o", "", "output path") + fs.StringVar(&opt.Replica, "replica", "", "replica name") + fs.StringVar(&opt.Generation, "generation", "", "generation name") + fs.StringVar(&opt.DryRun, "dry-run", "", "dry run") + timestampStr := fs.String("timestamp", "", "timestamp") + verbose := fs.Bool("v", false, "verbose output") + fs.Usage = c.Usage + if err := fs.Parse(args); err != nil { + return err + } else if fs.NArg() == 0 || fs.Arg(0) == "" { + return fmt.Errorf("database path required") + } else if fs.NArg() > 1 { + return fmt.Errorf("too many arguments") + } + + // Load configuration. + if configPath == "" { + return errors.New("-config required") + } + config, err := ReadConfigFile(configPath) + if err != nil { + return err + } + + // Parse timestamp, if specified. + if *timestampStr != "" { + if opts.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil { + return errors.New("invalid -timestamp, must specify in ISO 8601 format (e.g. 2000-01-01T00:00:00Z)") + } + } + + // Verbose output is automatically enabled if dry run is specified. + if opt.DryRun { + *verbose = true + } + + // Instantiate logger if verbose output is enabled. + if *verbose { + opt.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + + // Determine absolute path for database, if specified. + if c.DBPath, err = filepath.Abs(fs.Arg(0)); err != nil { + return err + } + + // Instantiate DB. + dbConfig := config.DBConfig(c.DBPath) + if dbConfig == nil { + return fmt.Errorf("database not found in config: %s", c.DBPath) + } + db, err := newDBFromConfig(dbConfig) + if err != nil { + return err + } + + return db.Restore(opt) +} + +func (c *RestoreCommand) Usage() { + fmt.Printf(` +The restore command recovers a database from a previous snapshot and WAL. + +Usage: + + litestream restore [arguments] DB + +Arguments: + + -config PATH + Specifies the configuration file. Defaults to %s + + -replica NAME + Restore from a specific replica. + Defaults to replica with latest data. + + -generation NAME + Restore from a specific generation. + Defaults to generation with latest data. + + -timestamp TIMESTAMP + Restore to a specific point-in-time. + Defaults to use the latest available backup. + + -o PATH + Output path of the restored database. + Defaults to original DB path. + + -dry-run + Prints all log output as if it were running but does + not perform actual restore. + + -v + Verbose output. + + +Examples: + + # Restore latest replica for database to original location. + $ litestream restore /path/to/db + + # Restore replica for database to a given point in time. + $ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db + + # Restore latest replica for database to new /tmp directory + $ litestream restore -o /tmp/db /path/to/db + + # Restore database from latest generation on S3. + $ litestream restore -replica s3 /path/to/db + + # Restore database from specific generation on S3. + $ litestream restore -replica s3 -generation xxxxxxxx /path/to/db +`[1:], + DefaultConfigPath, + ) +} diff --git a/db.go b/db.go index 78b90f2..88a8e31 100644 --- a/db.go +++ b/db.go @@ -1003,6 +1003,170 @@ func (db *DB) monitor() { } } +// Restore restores the database from a replica based on the options given. +// This method will restore into opt.OutputPath, if specified, or into the +// DB's original database path. It can optionally restore from a specific +// replica or generation or it will automatically choose the best one. Finally, +// a timestamp can be specified to restore the database to a specific +// point-in-time. +func (db *DB) Restore(opt RestoreOptions) { + // Ensure logger exists. + logger := opt.Logger + if logger == nil { + logger = log.New(ioutil.Discard, "", 0) + } + + // Determine the correct output path. + outputPath := opt.OutputPath + if outputPath == "" { + outputPath = db.Path + } + + // Ensure output path does not already exist (unless this is a dry run). + if !opt.DryRun { + if _, err := os.Stat(outputPath); err == nil { + return fmt.Errorf("cannot restore, output path already exists: %s", outputPath) + } else if err != nil && !os.IsNotExist(err) { + return outputPath + } + } + + // Determine target replica & generation to restore from. + r, generation, err := db.restoreTarget(opt, logger) + if err != nil { + return err + } + + // Determine manifest to restore from. + snapshotPath, walPaths, err := opt.determineRestoreManifest(r, generation, opt.Timestamp, logger) + if err != nil { + return err + } + + // Copy snapshot to output path. + logger.Printf("restoring snapshot from %s://%s/%s to %s.tmp", r.Name(), generation, snapshotPath, outputPath) + if !opt.DryRun { + if f, err := os.Create(outputPath + ".tmp"); err != nil { + return err + } else if err := r.RestoreSnapshot(f, snapshotPath); err != nil { + f.Close() + return err + } else if err := f.Sync(); err != nil { + f.Close() + return err + } else if err := f.Close(); err != nil { + return err + } + } + + // Restore each WAL file. + for _, walPath := range walPaths { + logger.Printf("restoring wal from %s://%s/%s to %s.tmp-wal", r.Name(), generation, snapshotPath, outputPath) + if opt.DryRun { + continue + } + + // Copy WAL from replica. + if f, err := os.Create(outputPath + ".tmp-wal"); err != nil { + return err + } else if err := r.RestoreWAL(f, walPath); err != nil { + f.Close() + return err + } else if err := f.Sync(); err != nil { + f.Close() + return err + } else if err := f.Close(); err != nil { + return err + } + + // TODO: Open database with SQLite and force a truncated checkpoint. + } + + // Copy file to final location. + logger.Printf("renaming database from temporary location") + if !opt.DryRun { + if err := os.Rename(outputPath+".tmp", outputPath); err != nil { + return err + } + } + + return nil +} + +func (db *DB) restoreTarget(opt RestoreOptions, logger *log.Logger) (Replicator, string, error) { + var target struct { + replicator Replicator + generation string + stats GenerationStats + } + + for _, r := range db.Replicators { + // Skip replica if it does not match filter. + if opt.ReplicaName != "" && r.Name() != opt.ReplicaName { + continue + } + + // Search generations for one that contains the requested timestamp. + for _, generation := range r.Generations() { + // Skip generation if it does not match filter. + if opt.Generation != "" && generation != opt.Generation { + continue + } + + // Fetch stats for generation. + stats, err := r.GenerationStats(generation) + if err != nil { + return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err) + } + + // Skip if it does not contain timestamp. + if !opt.Timestamp.IsZero() { + if opt.Timestamp.Before(stats.CreatedAt) || opt.Timestamp.After(stats.UpdatedAt) { + continue + } + } + + // Use the latest replica if we have multiple candidates. + if !stats.UpdatedAt.After(target.stats.UpdatedAt) { + continue + } + + target.replicator = r + target.generation = generation + target.stats = stats + } + } + + // Return an error if no matching targets found. + if target.generation == "" { + return nil, "", fmt.Errorf("no matching backups found") + } + + return target.replicator, target.generation, nil +} + +// RestoreOptions represents options for DB.Restore(). +type RestoreOptions struct { + // Target path to restore into. + // If blank, the original DB path is used. + OutputPath string + + // Specific replica to restore from. + // If blank, all replicas are considered. + ReplicaName string + + // Specific generation to restore from. + // If blank, all generations considered. + Generation string + + // Point-in-time to restore database. + // If zero, database restore to most recent state available. + Timestamp time.Time + + // Logger used to print status to. + Logger log.Logger +} + func headerByteOrder(hdr []byte) (binary.ByteOrder, error) { magic := binary.BigEndian.Uint32(hdr[0:]) switch magic {