From 67eeb49101fcb73240c085d3431d51c675e03068 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 25 Jan 2021 21:25:59 -0700 Subject: [PATCH] Allow replica URL to be used for commands This commit refactors the commands to allow a replica URL when restoring a database. If the first CLI arg is a URL with a scheme, the it is treated as a replica URL. --- README.md | 59 ++++++++++-- cmd/litestream/generations.go | 9 +- cmd/litestream/main.go | 159 ++++++++++++++++++++------------- cmd/litestream/replicate.go | 7 +- cmd/litestream/restore.go | 75 ++++++++++++---- cmd/litestream/snapshots.go | 66 ++++++++------ cmd/litestream/wal.go | 72 ++++++++------- db.go | 163 ++++++++++++++++++++-------------- litestream.go | 4 +- replica.go | 25 ++++-- s3/s3.go | 24 ++--- 11 files changed, 419 insertions(+), 244 deletions(-) diff --git a/README.md b/README.md index ce23c54..7c43fe4 100644 --- a/README.md +++ b/README.md @@ -50,12 +50,12 @@ $ sudo systemctl start litestream ### Release binaries You can also download the release binary for your system from the -[Releases page][releases] and run it as a standalone application. +[releases page][releases] and run it as a standalone application. ### Building from source -First, download and install the [Go toolchain](https://golang.org/). Then run: +Download and install the [Go toolchain](https://golang.org/) and then run: ```sh $ go install ./cmd/litestream @@ -64,11 +64,48 @@ $ go install ./cmd/litestream The `litestream` binary should be in your `$GOPATH/bin` folder. +## Quick Start + +Litestream provides a configuration file that can be used for production +deployments but you can also specify a single database and replica on the +command line for testing. + +First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this +example. You'll also need to set your AWS credentials: + +```sh +$ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx +$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx +``` + +Next you can run the `litestream replicate` command with the path to the +database you want to backup and the URL of your replica destination: + +```sh +$ litestream replicate /path/to/db s3://mybkt/db +``` + +If you make changes to your local database, those changes will be replicated +to S3 every 10 seconds. From another terminal window, you can restore your +database from your S3 replica: + +``` +$ litestream restore -v -o /path/to/restored/db s3://mybkt/db +``` + +Voila! 🎉 + +Your database should be restored to the last replicated state that +was sent to S3. You can adjust your replication frequency and other options by +using a configuration-based approach specified below. + + ## Configuration -Once installed locally, you'll need to create a config file. By default, the -config file lives at `/etc/litestream.yml` but you can pass in a different -path to any `litestream` command using the `-config PATH` flag. +A configuration-based install gives you more replication options. By default, +the config file lives at `/etc/litestream.yml` but you can pass in a different +path to any `litestream` command using the `-config PATH` flag. You can also +set the `LITESTREAM_CONFIG` environment variable to specify a new path. The configuration specifies one or more `dbs` and a list of one or more replica locations for each db. Below are some common configurations: @@ -85,7 +122,7 @@ secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx dbs: - path: /path/to/db replicas: - - path: s3://mybkt/db + - url: s3://mybkt/db ``` ### Replicate to another file path @@ -116,7 +153,7 @@ years are not. db: - path: /path/to/db replicas: - - path: s3://mybkt/db + - url: s3://mybkt/db retention: 1h # 1 hour retention ``` @@ -139,7 +176,8 @@ These are some additional configuration options available on replicas: - `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`. - `name`—Specify an optional name for the replica if you are using multiple replicas. -- `path`—File path or URL to the replica location. +- `path`—File path to the replica location. +- `url`—URL to the replica location. - `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`. - `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default. @@ -193,7 +231,10 @@ to worry about accidentally overwriting your current database. $ litestream restore /path/to/db # Restore database to a new location. -$ litestream restore -o /tmp/mynewdb /path/to/db +$ litestream restore -o /path/to/restored/db /path/to/db + +# Restore from a replica URL. +$ litestream restore -o /path/to/restored/db s3://mybkt/db # Restore database to a specific point-in-time. $ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index b1c2d84..67e6acc 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -101,12 +101,15 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) // Usage prints the help message to STDOUT. func (c *GenerationsCommand) Usage() { fmt.Printf(` -The generations command lists all generations for a database. It also lists -stats about their lag behind the primary database and the time range they cover. +The generations command lists all generations for a database or replica. It also +lists stats about their lag behind the primary database and the time range they +cover. Usage: - litestream generations [arguments] DB + litestream generations [arguments] DB_PATH + + litestream generations [arguments] REPLICA_URL Arguments: diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index b830014..c8a7850 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -87,12 +87,12 @@ Usage: The commands are: + databases list databases specified in config file generations list available generations for a database replicate runs a server to replicate databases restore recovers database backup from a replica snapshots list available snapshots for a database - validate checks replica to ensure a consistent state with primary - version prints the version + version prints the binary version wal list available WAL files for a database `[1:]) } @@ -112,16 +112,6 @@ type Config struct { Bucket string `yaml:"bucket"` } -// Normalize expands paths and parses URL-specified replicas. -func (c *Config) Normalize() error { - for i := range c.DBs { - if err := c.DBs[i].Normalize(); err != nil { - return err - } - } - return nil -} - // DefaultConfig returns a new instance of Config with defaults set. func DefaultConfig() Config { return Config{} @@ -156,9 +146,13 @@ func ReadConfigFile(filename string) (_ Config, err error) { return config, err } - if err := config.Normalize(); err != nil { - return config, err + // Normalize paths. + for _, dbConfig := range config.DBs { + if dbConfig.Path, err = expand(dbConfig.Path); err != nil { + return config, err + } } + return config, nil } @@ -168,26 +162,12 @@ type DBConfig struct { Replicas []*ReplicaConfig `yaml:"replicas"` } -// Normalize expands paths and parses URL-specified replicas. -func (c *DBConfig) Normalize() (err error) { - c.Path, err = expand(c.Path) - if err != nil { - return err - } - - for i := range c.Replicas { - if err := c.Replicas[i].Normalize(); err != nil { - return err - } - } - return nil -} - // ReplicaConfig represents the configuration for a single replica in a database. type ReplicaConfig struct { Type string `yaml:"type"` // "file", "s3" Name string `yaml:"name"` // name of replica, optional. Path string `yaml:"path"` + URL string `yaml:"url"` Retention time.Duration `yaml:"retention"` RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` SyncInterval time.Duration `yaml:"sync-interval"` // s3 only @@ -200,35 +180,63 @@ type ReplicaConfig struct { Bucket string `yaml:"bucket"` } -// Normalize expands paths and parses URL-specified replicas. -func (c *ReplicaConfig) Normalize() error { - // Attempt to parse as URL. Ignore if it is not a URL or if there is no scheme. - u, err := url.Parse(c.Path) - if err != nil || u.Scheme == "" { - if c.Type == "" || c.Type == "file" { - c.Path, err = expand(c.Path) - return err - } - return nil +// NewReplicaFromURL returns a new Replica instance configured from a URL. +// The replica's database is not set. +func NewReplicaFromURL(s string) (litestream.Replica, error) { + scheme, host, path, err := ParseReplicaURL(s) + if err != nil { + return nil, err + } + + switch scheme { + case "file": + return litestream.NewFileReplica(nil, "", path), nil + case "s3": + r := s3.NewReplica(nil, "") + r.Bucket, r.Path = host, path + return r, nil + default: + return nil, fmt.Errorf("invalid replica url type: %s", s) + } +} + +// ParseReplicaURL parses a replica URL. +func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) { + u, err := url.Parse(s) + if err != nil { + return "", "", "", err } switch u.Scheme { case "file": - c.Type, u.Scheme = u.Scheme, "" - c.Path = path.Clean(u.String()) - return nil + scheme, u.Scheme = u.Scheme, "" + return scheme, "", path.Clean(u.String()), nil - case "s3": - c.Type = u.Scheme - c.Path = strings.TrimPrefix(path.Clean(u.Path), "/") - c.Bucket = u.Host - return nil + case "": + return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s) default: - return fmt.Errorf("unrecognized replica type in path scheme: %s", c.Path) + return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil } } +// isURL returns true if s can be parsed and has a scheme. +func isURL(s string) bool { + u, err := url.Parse(s) + return err == nil && u.Scheme != "" +} + +// ReplicaType returns the type based on the type field or extracted from the URL. +func (c *ReplicaConfig) ReplicaType() string { + typ, _, _, _ := ParseReplicaURL(c.URL) + if typ != "" { + return typ + } else if c.Type != "" { + return c.Type + } + return "file" +} + // DefaultConfigPath returns the default config path. func DefaultConfigPath() string { if v := os.Getenv("LITESTREAM_CONFIG"); v != "" { @@ -243,8 +251,13 @@ func registerConfigFlag(fs *flag.FlagSet, p *string) { // newDBFromConfig instantiates a DB based on a configuration. func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) { + path, err := expand(dbc.Path) + if err != nil { + return nil, err + } + // Initialize database with given path. - db := litestream.NewDB(dbc.Path) + db := litestream.NewDB(path) // Instantiate and attach replicas. for _, rc := range dbc.Replicas { @@ -260,8 +273,13 @@ func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) { // newReplicaFromConfig instantiates a replica for a DB based on a config. func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) { - switch rc.Type { - case "", "file": + // Ensure user did not specify URL in path. + if isURL(rc.Path) { + return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path) + } + + switch rc.ReplicaType() { + case "file": return newFileReplicaFromConfig(db, c, dbc, rc) case "s3": return newS3ReplicaFromConfig(db, c, dbc, rc) @@ -271,12 +289,24 @@ func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Repli } // newFileReplicaFromConfig returns a new instance of FileReplica build from config. -func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*litestream.FileReplica, error) { - if rc.Path == "" { +func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) { + path := rc.Path + if rc.URL != "" { + _, _, path, err = ParseReplicaURL(rc.URL) + if err != nil { + return nil, err + } + } + + if path == "" { return nil, fmt.Errorf("%s: file replica path required", db.Path()) } - r := litestream.NewFileReplica(db, rc.Name, rc.Path) + if path, err = expand(path); err != nil { + return nil, err + } + + r := litestream.NewFileReplica(db, rc.Name, path) if v := rc.Retention; v > 0 { r.Retention = v } @@ -290,7 +320,20 @@ func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *R } // newS3ReplicaFromConfig returns a new instance of S3Replica build from config. -func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*s3.Replica, error) { +func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) { + bucket := c.Bucket + if v := rc.Bucket; v != "" { + bucket = v + } + + path := rc.Path + if rc.URL != "" { + _, bucket, path, err = ParseReplicaURL(rc.URL) + if err != nil { + return nil, err + } + } + // Use global or replica-specific S3 settings. accessKeyID := c.AccessKeyID if v := rc.AccessKeyID; v != "" { @@ -300,10 +343,6 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep if v := rc.SecretAccessKey; v != "" { secretAccessKey = v } - bucket := c.Bucket - if v := rc.Bucket; v != "" { - bucket = v - } region := c.Region if v := rc.Region; v != "" { region = v @@ -320,7 +359,7 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep r.SecretAccessKey = secretAccessKey r.Region = region r.Bucket = bucket - r.Path = rc.Path + r.Path = path if v := rc.Retention; v > 0 { r.Retention = v diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 6f8e04f..1cc9da7 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -43,7 +43,7 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { } else if fs.NArg() > 1 { dbConfig := &DBConfig{Path: fs.Arg(0)} for _, u := range fs.Args()[1:] { - dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{Path: u}) + dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{URL: u}) } config.DBs = []*DBConfig{dbConfig} } else if c.ConfigPath != "" { @@ -55,11 +55,6 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { return errors.New("-config flag or database/replica arguments required") } - // Normalize configuration paths. - if err := config.Normalize(); err != nil { - return err - } - // Enable trace logging. if *verbose { litestream.Tracef = log.Printf diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 68cc6a5..8568471 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -7,7 +7,6 @@ import ( "fmt" "log" "os" - "path/filepath" "time" "github.com/benbjohnson/litestream" @@ -38,15 +37,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { return fmt.Errorf("too many arguments") } - // Load configuration. - if configPath == "" { - return errors.New("-config required") - } - config, err := ReadConfigFile(configPath) - if err != nil { - return err - } - // Parse timestamp, if specified. if *timestampStr != "" { if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil { @@ -64,23 +54,72 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { opt.Logger = log.New(os.Stderr, "", log.LstdFlags) } - // Determine absolute path for database. - dbPath, err := filepath.Abs(fs.Arg(0)) - if err != nil { - return err + // Determine replica & generation to restore from. + var r litestream.Replica + if isURL(fs.Arg(0)) { + if r, err = c.loadFromURL(ctx, fs.Arg(0), &opt); err != nil { + return err + } + } else if configPath != "" { + if r, err = c.loadFromConfig(ctx, fs.Arg(0), configPath, &opt); err != nil { + return err + } + } else { + return errors.New("config path or replica URL required") } - // Instantiate DB. + // Return an error if no matching targets found. + if opt.Generation == "" { + return fmt.Errorf("no matching backups found") + } + + return litestream.RestoreReplica(ctx, r, opt) +} + +// loadFromURL creates a replica & updates the restore options from a replica URL. +func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) { + r, err := NewReplicaFromURL(replicaURL) + if err != nil { + return nil, err + } + opt.Generation, _, err = litestream.CalcReplicaRestoreTarget(ctx, r, *opt) + return r, err +} + +// loadFromConfig returns a replica & updates the restore options from a DB reference. +func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, opt *litestream.RestoreOptions) (litestream.Replica, error) { + // Load configuration. + config, err := ReadConfigFile(configPath) + if err != nil { + return nil, err + } + + // Lookup database from configuration file by path. + if dbPath, err = expand(dbPath); err != nil { + return nil, err + } dbConfig := config.DBConfig(dbPath) if dbConfig == nil { - return fmt.Errorf("database not found in config: %s", dbPath) + return nil, fmt.Errorf("database not found in config: %s", dbPath) } db, err := newDBFromConfig(&config, dbConfig) if err != nil { - return err + return nil, err } - return db.Restore(ctx, opt) + // Restore into original database path if not specified. + if opt.OutputPath == "" { + opt.OutputPath = dbPath + } + + // Determine the appropriate replica & generation to restore from, + r, generation, err := db.CalcRestoreTarget(ctx, *opt) + if err != nil { + return nil, err + } + opt.Generation = generation + + return r, nil } // Usage prints the help screen to STDOUT. diff --git a/cmd/litestream/snapshots.go b/cmd/litestream/snapshots.go index 69700e0..55a0e41 100644 --- a/cmd/litestream/snapshots.go +++ b/cmd/litestream/snapshots.go @@ -6,7 +6,6 @@ import ( "flag" "fmt" "os" - "path/filepath" "text/tabwriter" "time" @@ -31,37 +30,42 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { return fmt.Errorf("too many arguments") } - // Load configuration. - if configPath == "" { - return errors.New("-config required") - } - config, err := ReadConfigFile(configPath) - if err != nil { - return err - } + var db *litestream.DB + var r litestream.Replica + if isURL(fs.Arg(0)) { + if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil { + return err + } + } else if configPath != "" { + // Load configuration. + config, err := ReadConfigFile(configPath) + if err != nil { + return err + } - // Determine absolute path for database. - dbPath, err := filepath.Abs(fs.Arg(0)) - if err != nil { - return err - } + // Lookup database from configuration file by path. + if path, err := expand(fs.Arg(0)); err != nil { + return err + } else if dbc := config.DBConfig(path); dbc == nil { + return fmt.Errorf("database not found in config: %s", path) + } else if db, err = newDBFromConfig(&config, dbc); err != nil { + return err + } - // Instantiate DB. - dbConfig := config.DBConfig(dbPath) - if dbConfig == nil { - return fmt.Errorf("database not found in config: %s", dbPath) - } - db, err := newDBFromConfig(&config, dbConfig) - if err != nil { - return err + // Filter by replica, if specified. + if *replicaName != "" { + if r = db.Replica(*replicaName); r == nil { + return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path()) + } + } + } else { + return errors.New("config path or replica URL required") } // Find snapshots by db or replica. var infos []*litestream.SnapshotInfo - if *replicaName != "" { - if r := db.Replica(*replicaName); r == nil { - return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath) - } else if infos, err = r.Snapshots(ctx); err != nil { + if r != nil { + if infos, err = r.Snapshots(ctx); err != nil { return err } } else { @@ -90,11 +94,13 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { // Usage prints the help screen to STDOUT. func (c *SnapshotsCommand) Usage() { fmt.Printf(` -The snapshots command lists all snapshots available for a database. +The snapshots command lists all snapshots available for a database or replica. Usage: - litestream snapshots [arguments] DB + litestream snapshots [arguments] DB_PATH + + litestream snapshots [arguments] REPLICA_URL Arguments: @@ -105,7 +111,6 @@ Arguments: -replica NAME Optional, filter by a specific replica. - Examples: # List all snapshots for a database. @@ -114,6 +119,9 @@ Examples: # List all snapshots on S3. $ litestream snapshots -replica s3 /path/to/db + # List all snapshots by replica URL. + $ litestream snapshots s3://mybkt/db + `[1:], DefaultConfigPath(), ) diff --git a/cmd/litestream/wal.go b/cmd/litestream/wal.go index d2182a7..a71a574 100644 --- a/cmd/litestream/wal.go +++ b/cmd/litestream/wal.go @@ -6,7 +6,6 @@ import ( "flag" "fmt" "os" - "path/filepath" "text/tabwriter" "time" @@ -32,37 +31,42 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { return fmt.Errorf("too many arguments") } - // Load configuration. - if configPath == "" { - return errors.New("-config required") - } - config, err := ReadConfigFile(configPath) - if err != nil { - return err + var db *litestream.DB + var r litestream.Replica + if isURL(fs.Arg(0)) { + if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil { + return err + } + } else if configPath != "" { + // Load configuration. + config, err := ReadConfigFile(configPath) + if err != nil { + return err + } + + // Lookup database from configuration file by path. + if path, err := expand(fs.Arg(0)); err != nil { + return err + } else if dbc := config.DBConfig(path); dbc == nil { + return fmt.Errorf("database not found in config: %s", path) + } else if db, err = newDBFromConfig(&config, dbc); err != nil { + return err + } + + // Filter by replica, if specified. + if *replicaName != "" { + if r = db.Replica(*replicaName); r == nil { + return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path()) + } + } + } else { + return errors.New("config path or replica URL required") } - // Determine absolute path for database. - dbPath, err := filepath.Abs(fs.Arg(0)) - if err != nil { - return err - } - - // Instantiate DB. - dbConfig := config.DBConfig(dbPath) - if dbConfig == nil { - return fmt.Errorf("database not found in config: %s", dbPath) - } - db, err := newDBFromConfig(&config, dbConfig) - if err != nil { - return err - } - - // Find snapshots by db or replica. + // Find WAL files by db or replica. var infos []*litestream.WALInfo - if *replicaName != "" { - if r := db.Replica(*replicaName); r == nil { - return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath) - } else if infos, err = r.WALs(ctx); err != nil { + if r != nil { + if infos, err = r.WALs(ctx); err != nil { return err } } else { @@ -100,7 +104,9 @@ The wal command lists all wal files available for a database. Usage: - litestream wal [arguments] DB + litestream wal [arguments] DB_PATH + + litestream wal [arguments] REPLICA_URL Arguments: @@ -114,14 +120,16 @@ Arguments: -generation NAME Optional, filter by a specific generation. - Examples: # List all WAL files for a database. $ litestream wal /path/to/db # List all WAL files on S3 for a specific generation. - $ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db + $ litestream wal -replica s3 -generation xxxxxxxx /path/to/db + + # List all WAL files for replica URL. + $ litestream wal s3://mybkt/db `[1:], DefaultConfigPath(), diff --git a/db.go b/db.go index c71d380..9040286 100644 --- a/db.go +++ b/db.go @@ -622,7 +622,7 @@ func (db *DB) CurrentGeneration() (string, error) { return "", err } - // TODO: Verify if generation directory exists. If not, delete. + // TODO: Verify if generation directory exists. If not, delete name file. generation := strings.TrimSpace(string(buf)) if len(generation) != GenerationNameLen { @@ -1343,15 +1343,17 @@ func (db *DB) monitor() { } } -// Restore restores the database from a replica based on the options given. +// RestoreReplica restores the database from a replica based on the options given. // This method will restore into opt.OutputPath, if specified, or into the // DB's original database path. It can optionally restore from a specific // replica or generation or it will automatically choose the best one. Finally, // a timestamp can be specified to restore the database to a specific // point-in-time. -func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { +func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error { // Validate options. - if opt.Generation == "" && opt.Index != math.MaxInt64 { + if opt.OutputPath == "" { + return fmt.Errorf("output path required") + } else if opt.Generation == "" && opt.Index != math.MaxInt64 { return fmt.Errorf("must specify generation when restoring to index") } else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() { return fmt.Errorf("cannot specify index & timestamp to restore") @@ -1363,69 +1365,62 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { logger = log.New(ioutil.Discard, "", 0) } - // Determine the correct output path. - outputPath := opt.OutputPath - if outputPath == "" { - outputPath = db.Path() + logPrefix := r.Name() + if db := r.DB(); db != nil { + logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name()) } // Ensure output path does not already exist (unless this is a dry run). if !opt.DryRun { - if _, err := os.Stat(outputPath); err == nil { - return fmt.Errorf("cannot restore, output path already exists: %s", outputPath) + if _, err := os.Stat(opt.OutputPath); err == nil { + return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath) } else if err != nil && !os.IsNotExist(err) { return err } } - // Determine target replica & generation to restore from. - r, generation, err := db.restoreTarget(ctx, opt, logger) - if err != nil { - return err - } - // Find lastest snapshot that occurs before timestamp. - minWALIndex, err := SnapshotIndexAt(ctx, r, generation, opt.Timestamp) + minWALIndex, err := SnapshotIndexAt(ctx, r, opt.Generation, opt.Timestamp) if err != nil { return fmt.Errorf("cannot find snapshot index for restore: %w", err) } // Find the maximum WAL index that occurs before timestamp. - maxWALIndex, err := WALIndexAt(ctx, r, generation, opt.Index, opt.Timestamp) + maxWALIndex, err := WALIndexAt(ctx, r, opt.Generation, opt.Index, opt.Timestamp) if err != nil { return fmt.Errorf("cannot find max wal index for restore: %w", err) } - log.Printf("%s(%s): starting restore: generation %08x, index %08x-%08x", db.path, r.Name(), generation, minWALIndex, maxWALIndex) + logger.Printf("%s: starting restore: generation %08x, index %08x-%08x", logPrefix, opt.Generation, minWALIndex, maxWALIndex) // Initialize starting position. - pos := Pos{Generation: generation, Index: minWALIndex} - tmpPath := outputPath + ".tmp" + pos := Pos{Generation: opt.Generation, Index: minWALIndex} + tmpPath := opt.OutputPath + ".tmp" // Copy snapshot to output path. if !opt.DryRun { - if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { + if err := restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { return fmt.Errorf("cannot restore snapshot: %w", err) } } - log.Printf("%s(%s): restoring snapshot %s/%08x to %s", db.path, r.Name(), generation, minWALIndex, tmpPath) + logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath) // Restore each WAL file until we reach our maximum index. for index := minWALIndex; index <= maxWALIndex; index++ { if !opt.DryRun { - if err = db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { - log.Printf("%s(%s): no wal available, snapshot only", db.path, r.Name()) + if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { + logger.Printf("%s: no wal available, snapshot only", logPrefix) break // snapshot file only, ignore error } else if err != nil { return fmt.Errorf("cannot restore wal: %w", err) } } - log.Printf("%s(%s): restored wal %s/%08x", db.path, r.Name(), generation, index) + logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index) } // Copy file to final location. - log.Printf("%s(%s): renaming database from temporary location", db.path, r.Name()) + logger.Printf("%s: renaming database from temporary location", logPrefix) if !opt.DryRun { - if err := os.Rename(tmpPath, outputPath); err != nil { + if err := os.Rename(tmpPath, opt.OutputPath); err != nil { return err } } @@ -1447,7 +1442,8 @@ func checksumFile(filename string) (uint64, error) { return h.Sum64(), nil } -func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) { +// CalcRestoreTarget returns a replica & generation to restore from based on opt criteria. +func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (Replica, string, error) { var target struct { replica Replica generation string @@ -1460,57 +1456,80 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log continue } - generations, err := r.Generations(ctx) + generation, stats, err := CalcReplicaRestoreTarget(ctx, r, opt) if err != nil { - return nil, "", fmt.Errorf("cannot fetch generations: %w", err) + return nil, "", err } - // Search generations for one that contains the requested timestamp. - for _, generation := range generations { - // Skip generation if it does not match filter. - if opt.Generation != "" && generation != opt.Generation { - continue - } - - // Fetch stats for generation. - stats, err := r.GenerationStats(ctx, generation) - if err != nil { - return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err) - } - - // Skip if it does not contain timestamp. - if !opt.Timestamp.IsZero() { - if opt.Timestamp.Before(stats.CreatedAt) || opt.Timestamp.After(stats.UpdatedAt) { - continue - } - } - - // Use the latest replica if we have multiple candidates. - if !stats.UpdatedAt.After(target.stats.UpdatedAt) { - continue - } - - target.replica = r - target.generation = generation - target.stats = stats + // Use the latest replica if we have multiple candidates. + if !stats.UpdatedAt.After(target.stats.UpdatedAt) { + continue } - } - // Return an error if no matching targets found. - if target.generation == "" { - return nil, "", fmt.Errorf("no matching backups found") + target.replica, target.generation, target.stats = r, generation, stats } - return target.replica, target.generation, nil } +// CalcReplicaRestoreTarget returns a generation to restore from. +func CalcReplicaRestoreTarget(ctx context.Context, r Replica, opt RestoreOptions) (generation string, stats GenerationStats, err error) { + var target struct { + generation string + stats GenerationStats + } + + generations, err := r.Generations(ctx) + if err != nil { + return "", stats, fmt.Errorf("cannot fetch generations: %w", err) + } + + // Search generations for one that contains the requested timestamp. + for _, generation := range generations { + // Skip generation if it does not match filter. + if opt.Generation != "" && generation != opt.Generation { + continue + } + + // Fetch stats for generation. + stats, err := r.GenerationStats(ctx, generation) + if err != nil { + return "", stats, fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err) + } + + // Skip if it does not contain timestamp. + if !opt.Timestamp.IsZero() { + if opt.Timestamp.Before(stats.CreatedAt) || opt.Timestamp.After(stats.UpdatedAt) { + continue + } + } + + // Use the latest replica if we have multiple candidates. + if !stats.UpdatedAt.After(target.stats.UpdatedAt) { + continue + } + + target.generation = generation + target.stats = stats + } + + return target.generation, target.stats, nil +} + // restoreSnapshot copies a snapshot from the replica to a file. -func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error { - if err := mkdirAll(filepath.Dir(filename), db.dirmode, db.diruid, db.dirgid); err != nil { +func restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error { + // Determine the user/group & mode based on the DB, if available. + uid, gid, mode := -1, -1, os.FileMode(0600) + diruid, dirgid, dirmode := -1, -1, os.FileMode(0700) + if db := r.DB(); db != nil { + uid, gid, mode = db.uid, db.gid, db.mode + diruid, dirgid, dirmode = db.diruid, db.dirgid, db.dirmode + } + + if err := mkdirAll(filepath.Dir(filename), dirmode, diruid, dirgid); err != nil { return err } - f, err := createFile(filename, db.uid, db.gid) + f, err := createFile(filename, mode, uid, gid) if err != nil { return err } @@ -1533,7 +1552,13 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string, } // restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. -func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { +func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { + // Determine the user/group & mode based on the DB, if available. + uid, gid, mode := -1, -1, os.FileMode(0600) + if db := r.DB(); db != nil { + uid, gid, mode = db.uid, db.gid, db.mode + } + // Open WAL file from replica. rd, err := r.WALReader(ctx, generation, index) if err != nil { @@ -1542,7 +1567,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde defer rd.Close() // Open handle to destination WAL path. - f, err := createFile(dbPath+"-wal", db.uid, db.gid) + f, err := createFile(dbPath+"-wal", mode, uid, gid) if err != nil { return err } diff --git a/litestream.go b/litestream.go index cf1b094..f887372 100644 --- a/litestream.go +++ b/litestream.go @@ -257,8 +257,8 @@ func isHexChar(ch rune) bool { } // createFile creates the file and attempts to set the UID/GID. -func createFile(filename string, uid, gid int) (*os.File, error) { - f, err := os.Create(filename) +func createFile(filename string, perm os.FileMode, uid, gid int) (*os.File, error) { + f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, perm) if err != nil { return nil, err } diff --git a/replica.go b/replica.go index b60a49e..44b3033 100644 --- a/replica.go +++ b/replica.go @@ -125,10 +125,14 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica { MonitorEnabled: true, } - r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.path, r.Name()) - r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.path, r.Name()) - r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.path, r.Name()) - r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.path, r.Name()) + var dbPath string + if db != nil { + dbPath = db.Path() + } + r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name()) + r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name()) + r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name()) + r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name()) return r } @@ -923,6 +927,10 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int, var index int for _, wal := range wals { + if wal.Generation != generation { + continue + } + if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) { continue // after timestamp, skip } else if wal.Index > maxIndex { @@ -949,7 +957,12 @@ func compressFile(src, dst string, uid, gid int) error { } defer r.Close() - w, err := createFile(dst+".tmp", uid, gid) + fi, err := r.Stat() + if err != nil { + return err + } + + w, err := createFile(dst+".tmp", fi.Mode(), uid, gid) if err != nil { return err } @@ -1000,7 +1013,7 @@ func ValidateReplica(ctx context.Context, r Replica) error { defer os.RemoveAll(tmpdir) restorePath := filepath.Join(tmpdir, "db") - if err := db.Restore(ctx, RestoreOptions{ + if err := RestoreReplica(ctx, r, RestoreOptions{ OutputPath: restorePath, ReplicaName: r.Name(), Generation: pos.Generation, diff --git a/s3/s3.go b/s3/s3.go index c373f02..58ba280 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -105,16 +105,20 @@ func NewReplica(db *litestream.DB, name string) *Replica { MonitorEnabled: true, } - r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name()) - r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name()) - r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name()) - r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name()) - r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") - r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") - r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") - r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") - r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST") - r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE") + var dbPath string + if db != nil { + dbPath = db.Path() + } + r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name()) + r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name()) + r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name()) + r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name()) + r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "PUT") + r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "PUT") + r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "GET") + r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "GET") + r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "LIST") + r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "DELETE") return r }