Allow replica URL to be used for commands

This commit refactors the commands to allow a replica URL when
restoring a database. If the first CLI arg is a URL with a scheme,
the it is treated as a replica URL.
This commit is contained in:
Ben Johnson
2021-01-25 21:25:59 -07:00
parent f7213ed35c
commit 67eeb49101
11 changed files with 419 additions and 244 deletions

View File

@@ -50,12 +50,12 @@ $ sudo systemctl start litestream
### Release binaries ### Release binaries
You can also download the release binary for your system from the You can also download the release binary for your system from the
[Releases page][releases] and run it as a standalone application. [releases page][releases] and run it as a standalone application.
### Building from source ### Building from source
First, download and install the [Go toolchain](https://golang.org/). Then run: Download and install the [Go toolchain](https://golang.org/) and then run:
```sh ```sh
$ go install ./cmd/litestream $ go install ./cmd/litestream
@@ -64,11 +64,48 @@ $ go install ./cmd/litestream
The `litestream` binary should be in your `$GOPATH/bin` folder. The `litestream` binary should be in your `$GOPATH/bin` folder.
## Quick Start
Litestream provides a configuration file that can be used for production
deployments but you can also specify a single database and replica on the
command line for testing.
First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this
example. You'll also need to set your AWS credentials:
```sh
$ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
```
Next you can run the `litestream replicate` command with the path to the
database you want to backup and the URL of your replica destination:
```sh
$ litestream replicate /path/to/db s3://mybkt/db
```
If you make changes to your local database, those changes will be replicated
to S3 every 10 seconds. From another terminal window, you can restore your
database from your S3 replica:
```
$ litestream restore -v -o /path/to/restored/db s3://mybkt/db
```
Voila! 🎉
Your database should be restored to the last replicated state that
was sent to S3. You can adjust your replication frequency and other options by
using a configuration-based approach specified below.
## Configuration ## Configuration
Once installed locally, you'll need to create a config file. By default, the A configuration-based install gives you more replication options. By default,
config file lives at `/etc/litestream.yml` but you can pass in a different the config file lives at `/etc/litestream.yml` but you can pass in a different
path to any `litestream` command using the `-config PATH` flag. path to any `litestream` command using the `-config PATH` flag. You can also
set the `LITESTREAM_CONFIG` environment variable to specify a new path.
The configuration specifies one or more `dbs` and a list of one or more replica The configuration specifies one or more `dbs` and a list of one or more replica
locations for each db. Below are some common configurations: locations for each db. Below are some common configurations:
@@ -85,7 +122,7 @@ secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
dbs: dbs:
- path: /path/to/db - path: /path/to/db
replicas: replicas:
- path: s3://mybkt/db - url: s3://mybkt/db
``` ```
### Replicate to another file path ### Replicate to another file path
@@ -116,7 +153,7 @@ years are not.
db: db:
- path: /path/to/db - path: /path/to/db
replicas: replicas:
- path: s3://mybkt/db - url: s3://mybkt/db
retention: 1h # 1 hour retention retention: 1h # 1 hour retention
``` ```
@@ -139,7 +176,8 @@ These are some additional configuration options available on replicas:
- `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`. - `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`.
- `name`—Specify an optional name for the replica if you are using multiple replicas. - `name`—Specify an optional name for the replica if you are using multiple replicas.
- `path`—File path or URL to the replica location. - `path`—File path to the replica location.
- `url`—URL to the replica location.
- `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`. - `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`.
- `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default. - `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default.
@@ -193,7 +231,10 @@ to worry about accidentally overwriting your current database.
$ litestream restore /path/to/db $ litestream restore /path/to/db
# Restore database to a new location. # Restore database to a new location.
$ litestream restore -o /tmp/mynewdb /path/to/db $ litestream restore -o /path/to/restored/db /path/to/db
# Restore from a replica URL.
$ litestream restore -o /path/to/restored/db s3://mybkt/db
# Restore database to a specific point-in-time. # Restore database to a specific point-in-time.
$ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db $ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db

View File

@@ -101,12 +101,15 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
// Usage prints the help message to STDOUT. // Usage prints the help message to STDOUT.
func (c *GenerationsCommand) Usage() { func (c *GenerationsCommand) Usage() {
fmt.Printf(` fmt.Printf(`
The generations command lists all generations for a database. It also lists The generations command lists all generations for a database or replica. It also
stats about their lag behind the primary database and the time range they cover. lists stats about their lag behind the primary database and the time range they
cover.
Usage: Usage:
litestream generations [arguments] DB litestream generations [arguments] DB_PATH
litestream generations [arguments] REPLICA_URL
Arguments: Arguments:

View File

@@ -87,12 +87,12 @@ Usage:
The commands are: The commands are:
databases list databases specified in config file
generations list available generations for a database generations list available generations for a database
replicate runs a server to replicate databases replicate runs a server to replicate databases
restore recovers database backup from a replica restore recovers database backup from a replica
snapshots list available snapshots for a database snapshots list available snapshots for a database
validate checks replica to ensure a consistent state with primary version prints the binary version
version prints the version
wal list available WAL files for a database wal list available WAL files for a database
`[1:]) `[1:])
} }
@@ -112,16 +112,6 @@ type Config struct {
Bucket string `yaml:"bucket"` Bucket string `yaml:"bucket"`
} }
// Normalize expands paths and parses URL-specified replicas.
func (c *Config) Normalize() error {
for i := range c.DBs {
if err := c.DBs[i].Normalize(); err != nil {
return err
}
}
return nil
}
// DefaultConfig returns a new instance of Config with defaults set. // DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config { func DefaultConfig() Config {
return Config{} return Config{}
@@ -156,9 +146,13 @@ func ReadConfigFile(filename string) (_ Config, err error) {
return config, err return config, err
} }
if err := config.Normalize(); err != nil { // Normalize paths.
for _, dbConfig := range config.DBs {
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
return config, err return config, err
} }
}
return config, nil return config, nil
} }
@@ -168,26 +162,12 @@ type DBConfig struct {
Replicas []*ReplicaConfig `yaml:"replicas"` Replicas []*ReplicaConfig `yaml:"replicas"`
} }
// Normalize expands paths and parses URL-specified replicas.
func (c *DBConfig) Normalize() (err error) {
c.Path, err = expand(c.Path)
if err != nil {
return err
}
for i := range c.Replicas {
if err := c.Replicas[i].Normalize(); err != nil {
return err
}
}
return nil
}
// ReplicaConfig represents the configuration for a single replica in a database. // ReplicaConfig represents the configuration for a single replica in a database.
type ReplicaConfig struct { type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3" Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional. Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"` Path string `yaml:"path"`
URL string `yaml:"url"`
Retention time.Duration `yaml:"retention"` Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
@@ -200,35 +180,63 @@ type ReplicaConfig struct {
Bucket string `yaml:"bucket"` Bucket string `yaml:"bucket"`
} }
// Normalize expands paths and parses URL-specified replicas. // NewReplicaFromURL returns a new Replica instance configured from a URL.
func (c *ReplicaConfig) Normalize() error { // The replica's database is not set.
// Attempt to parse as URL. Ignore if it is not a URL or if there is no scheme. func NewReplicaFromURL(s string) (litestream.Replica, error) {
u, err := url.Parse(c.Path) scheme, host, path, err := ParseReplicaURL(s)
if err != nil || u.Scheme == "" { if err != nil {
if c.Type == "" || c.Type == "file" { return nil, err
c.Path, err = expand(c.Path)
return err
} }
return nil
switch scheme {
case "file":
return litestream.NewFileReplica(nil, "", path), nil
case "s3":
r := s3.NewReplica(nil, "")
r.Bucket, r.Path = host, path
return r, nil
default:
return nil, fmt.Errorf("invalid replica url type: %s", s)
}
}
// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
} }
switch u.Scheme { switch u.Scheme {
case "file": case "file":
c.Type, u.Scheme = u.Scheme, "" scheme, u.Scheme = u.Scheme, ""
c.Path = path.Clean(u.String()) return scheme, "", path.Clean(u.String()), nil
return nil
case "s3": case "":
c.Type = u.Scheme return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
c.Path = strings.TrimPrefix(path.Clean(u.Path), "/")
c.Bucket = u.Host
return nil
default: default:
return fmt.Errorf("unrecognized replica type in path scheme: %s", c.Path) return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
} }
} }
// isURL returns true if s can be parsed and has a scheme.
func isURL(s string) bool {
u, err := url.Parse(s)
return err == nil && u.Scheme != ""
}
// ReplicaType returns the type based on the type field or extracted from the URL.
func (c *ReplicaConfig) ReplicaType() string {
typ, _, _, _ := ParseReplicaURL(c.URL)
if typ != "" {
return typ
} else if c.Type != "" {
return c.Type
}
return "file"
}
// DefaultConfigPath returns the default config path. // DefaultConfigPath returns the default config path.
func DefaultConfigPath() string { func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" { if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
@@ -243,8 +251,13 @@ func registerConfigFlag(fs *flag.FlagSet, p *string) {
// newDBFromConfig instantiates a DB based on a configuration. // newDBFromConfig instantiates a DB based on a configuration.
func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) { func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
path, err := expand(dbc.Path)
if err != nil {
return nil, err
}
// Initialize database with given path. // Initialize database with given path.
db := litestream.NewDB(dbc.Path) db := litestream.NewDB(path)
// Instantiate and attach replicas. // Instantiate and attach replicas.
for _, rc := range dbc.Replicas { for _, rc := range dbc.Replicas {
@@ -260,8 +273,13 @@ func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
// newReplicaFromConfig instantiates a replica for a DB based on a config. // newReplicaFromConfig instantiates a replica for a DB based on a config.
func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) { func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) {
switch rc.Type { // Ensure user did not specify URL in path.
case "", "file": if isURL(rc.Path) {
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path)
}
switch rc.ReplicaType() {
case "file":
return newFileReplicaFromConfig(db, c, dbc, rc) return newFileReplicaFromConfig(db, c, dbc, rc)
case "s3": case "s3":
return newS3ReplicaFromConfig(db, c, dbc, rc) return newS3ReplicaFromConfig(db, c, dbc, rc)
@@ -271,12 +289,24 @@ func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Repli
} }
// newFileReplicaFromConfig returns a new instance of FileReplica build from config. // newFileReplicaFromConfig returns a new instance of FileReplica build from config.
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*litestream.FileReplica, error) { func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) {
if rc.Path == "" { path := rc.Path
if rc.URL != "" {
_, _, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
if path == "" {
return nil, fmt.Errorf("%s: file replica path required", db.Path()) return nil, fmt.Errorf("%s: file replica path required", db.Path())
} }
r := litestream.NewFileReplica(db, rc.Name, rc.Path) if path, err = expand(path); err != nil {
return nil, err
}
r := litestream.NewFileReplica(db, rc.Name, path)
if v := rc.Retention; v > 0 { if v := rc.Retention; v > 0 {
r.Retention = v r.Retention = v
} }
@@ -290,7 +320,20 @@ func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *R
} }
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config. // newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*s3.Replica, error) { func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) {
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
path := rc.Path
if rc.URL != "" {
_, bucket, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
// Use global or replica-specific S3 settings. // Use global or replica-specific S3 settings.
accessKeyID := c.AccessKeyID accessKeyID := c.AccessKeyID
if v := rc.AccessKeyID; v != "" { if v := rc.AccessKeyID; v != "" {
@@ -300,10 +343,6 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
if v := rc.SecretAccessKey; v != "" { if v := rc.SecretAccessKey; v != "" {
secretAccessKey = v secretAccessKey = v
} }
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
region := c.Region region := c.Region
if v := rc.Region; v != "" { if v := rc.Region; v != "" {
region = v region = v
@@ -320,7 +359,7 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
r.SecretAccessKey = secretAccessKey r.SecretAccessKey = secretAccessKey
r.Region = region r.Region = region
r.Bucket = bucket r.Bucket = bucket
r.Path = rc.Path r.Path = path
if v := rc.Retention; v > 0 { if v := rc.Retention; v > 0 {
r.Retention = v r.Retention = v

View File

@@ -43,7 +43,7 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
} else if fs.NArg() > 1 { } else if fs.NArg() > 1 {
dbConfig := &DBConfig{Path: fs.Arg(0)} dbConfig := &DBConfig{Path: fs.Arg(0)}
for _, u := range fs.Args()[1:] { for _, u := range fs.Args()[1:] {
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{Path: u}) dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{URL: u})
} }
config.DBs = []*DBConfig{dbConfig} config.DBs = []*DBConfig{dbConfig}
} else if c.ConfigPath != "" { } else if c.ConfigPath != "" {
@@ -55,11 +55,6 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
return errors.New("-config flag or database/replica arguments required") return errors.New("-config flag or database/replica arguments required")
} }
// Normalize configuration paths.
if err := config.Normalize(); err != nil {
return err
}
// Enable trace logging. // Enable trace logging.
if *verbose { if *verbose {
litestream.Tracef = log.Printf litestream.Tracef = log.Printf

View File

@@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"path/filepath"
"time" "time"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
@@ -38,15 +37,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments") return fmt.Errorf("too many arguments")
} }
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Parse timestamp, if specified. // Parse timestamp, if specified.
if *timestampStr != "" { if *timestampStr != "" {
if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil { if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
@@ -64,23 +54,72 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags) opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
} }
// Determine absolute path for database. // Determine replica & generation to restore from.
dbPath, err := filepath.Abs(fs.Arg(0)) var r litestream.Replica
if err != nil { if isURL(fs.Arg(0)) {
if r, err = c.loadFromURL(ctx, fs.Arg(0), &opt); err != nil {
return err return err
} }
} else if configPath != "" {
if r, err = c.loadFromConfig(ctx, fs.Arg(0), configPath, &opt); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Instantiate DB. // Return an error if no matching targets found.
if opt.Generation == "" {
return fmt.Errorf("no matching backups found")
}
return litestream.RestoreReplica(ctx, r, opt)
}
// loadFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
r, err := NewReplicaFromURL(replicaURL)
if err != nil {
return nil, err
}
opt.Generation, _, err = litestream.CalcReplicaRestoreTarget(ctx, r, *opt)
return r, err
}
// loadFromConfig returns a replica & updates the restore options from a DB reference.
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return nil, err
}
// Lookup database from configuration file by path.
if dbPath, err = expand(dbPath); err != nil {
return nil, err
}
dbConfig := config.DBConfig(dbPath) dbConfig := config.DBConfig(dbPath)
if dbConfig == nil { if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath) return nil, fmt.Errorf("database not found in config: %s", dbPath)
} }
db, err := newDBFromConfig(&config, dbConfig) db, err := newDBFromConfig(&config, dbConfig)
if err != nil { if err != nil {
return err return nil, err
} }
return db.Restore(ctx, opt) // Restore into original database path if not specified.
if opt.OutputPath == "" {
opt.OutputPath = dbPath
}
// Determine the appropriate replica & generation to restore from,
r, generation, err := db.CalcRestoreTarget(ctx, *opt)
if err != nil {
return nil, err
}
opt.Generation = generation
return r, nil
} }
// Usage prints the help screen to STDOUT. // Usage prints the help screen to STDOUT.

View File

@@ -6,7 +6,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"path/filepath"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -31,37 +30,42 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments") return fmt.Errorf("too many arguments")
} }
// Load configuration. var db *litestream.DB
if configPath == "" { var r litestream.Replica
return errors.New("-config required") if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
} }
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath) config, err := ReadConfigFile(configPath)
if err != nil { if err != nil {
return err return err
} }
// Determine absolute path for database. // Lookup database from configuration file by path.
dbPath, err := filepath.Abs(fs.Arg(0)) if path, err := expand(fs.Arg(0)); err != nil {
if err != nil { return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err return err
} }
// Instantiate DB. // Filter by replica, if specified.
dbConfig := config.DBConfig(dbPath) if *replicaName != "" {
if dbConfig == nil { if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("database not found in config: %s", dbPath) return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
} }
db, err := newDBFromConfig(&config, dbConfig) }
if err != nil { } else {
return err return errors.New("config path or replica URL required")
} }
// Find snapshots by db or replica. // Find snapshots by db or replica.
var infos []*litestream.SnapshotInfo var infos []*litestream.SnapshotInfo
if *replicaName != "" { if r != nil {
if r := db.Replica(*replicaName); r == nil { if infos, err = r.Snapshots(ctx); err != nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.Snapshots(ctx); err != nil {
return err return err
} }
} else { } else {
@@ -90,11 +94,13 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
// Usage prints the help screen to STDOUT. // Usage prints the help screen to STDOUT.
func (c *SnapshotsCommand) Usage() { func (c *SnapshotsCommand) Usage() {
fmt.Printf(` fmt.Printf(`
The snapshots command lists all snapshots available for a database. The snapshots command lists all snapshots available for a database or replica.
Usage: Usage:
litestream snapshots [arguments] DB litestream snapshots [arguments] DB_PATH
litestream snapshots [arguments] REPLICA_URL
Arguments: Arguments:
@@ -105,7 +111,6 @@ Arguments:
-replica NAME -replica NAME
Optional, filter by a specific replica. Optional, filter by a specific replica.
Examples: Examples:
# List all snapshots for a database. # List all snapshots for a database.
@@ -114,6 +119,9 @@ Examples:
# List all snapshots on S3. # List all snapshots on S3.
$ litestream snapshots -replica s3 /path/to/db $ litestream snapshots -replica s3 /path/to/db
# List all snapshots by replica URL.
$ litestream snapshots s3://mybkt/db
`[1:], `[1:],
DefaultConfigPath(), DefaultConfigPath(),
) )

View File

@@ -6,7 +6,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"path/filepath"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -32,37 +31,42 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments") return fmt.Errorf("too many arguments")
} }
// Load configuration. var db *litestream.DB
if configPath == "" { var r litestream.Replica
return errors.New("-config required") if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
} }
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath) config, err := ReadConfigFile(configPath)
if err != nil { if err != nil {
return err return err
} }
// Determine absolute path for database. // Lookup database from configuration file by path.
dbPath, err := filepath.Abs(fs.Arg(0)) if path, err := expand(fs.Arg(0)); err != nil {
if err != nil { return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err return err
} }
// Instantiate DB. // Filter by replica, if specified.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
var infos []*litestream.WALInfo
if *replicaName != "" { if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil { if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath) return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
} else if infos, err = r.WALs(ctx); err != nil { }
}
} else {
return errors.New("config path or replica URL required")
}
// Find WAL files by db or replica.
var infos []*litestream.WALInfo
if r != nil {
if infos, err = r.WALs(ctx); err != nil {
return err return err
} }
} else { } else {
@@ -100,7 +104,9 @@ The wal command lists all wal files available for a database.
Usage: Usage:
litestream wal [arguments] DB litestream wal [arguments] DB_PATH
litestream wal [arguments] REPLICA_URL
Arguments: Arguments:
@@ -114,14 +120,16 @@ Arguments:
-generation NAME -generation NAME
Optional, filter by a specific generation. Optional, filter by a specific generation.
Examples: Examples:
# List all WAL files for a database. # List all WAL files for a database.
$ litestream wal /path/to/db $ litestream wal /path/to/db
# List all WAL files on S3 for a specific generation. # List all WAL files on S3 for a specific generation.
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db $ litestream wal -replica s3 -generation xxxxxxxx /path/to/db
# List all WAL files for replica URL.
$ litestream wal s3://mybkt/db
`[1:], `[1:],
DefaultConfigPath(), DefaultConfigPath(),

113
db.go
View File

@@ -622,7 +622,7 @@ func (db *DB) CurrentGeneration() (string, error) {
return "", err return "", err
} }
// TODO: Verify if generation directory exists. If not, delete. // TODO: Verify if generation directory exists. If not, delete name file.
generation := strings.TrimSpace(string(buf)) generation := strings.TrimSpace(string(buf))
if len(generation) != GenerationNameLen { if len(generation) != GenerationNameLen {
@@ -1343,15 +1343,17 @@ func (db *DB) monitor() {
} }
} }
// Restore restores the database from a replica based on the options given. // RestoreReplica restores the database from a replica based on the options given.
// This method will restore into opt.OutputPath, if specified, or into the // This method will restore into opt.OutputPath, if specified, or into the
// DB's original database path. It can optionally restore from a specific // DB's original database path. It can optionally restore from a specific
// replica or generation or it will automatically choose the best one. Finally, // replica or generation or it will automatically choose the best one. Finally,
// a timestamp can be specified to restore the database to a specific // a timestamp can be specified to restore the database to a specific
// point-in-time. // point-in-time.
func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error {
// Validate options. // Validate options.
if opt.Generation == "" && opt.Index != math.MaxInt64 { if opt.OutputPath == "" {
return fmt.Errorf("output path required")
} else if opt.Generation == "" && opt.Index != math.MaxInt64 {
return fmt.Errorf("must specify generation when restoring to index") return fmt.Errorf("must specify generation when restoring to index")
} else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() { } else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() {
return fmt.Errorf("cannot specify index & timestamp to restore") return fmt.Errorf("cannot specify index & timestamp to restore")
@@ -1363,69 +1365,62 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
logger = log.New(ioutil.Discard, "", 0) logger = log.New(ioutil.Discard, "", 0)
} }
// Determine the correct output path. logPrefix := r.Name()
outputPath := opt.OutputPath if db := r.DB(); db != nil {
if outputPath == "" { logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name())
outputPath = db.Path()
} }
// Ensure output path does not already exist (unless this is a dry run). // Ensure output path does not already exist (unless this is a dry run).
if !opt.DryRun { if !opt.DryRun {
if _, err := os.Stat(outputPath); err == nil { if _, err := os.Stat(opt.OutputPath); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", outputPath) return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
} else if err != nil && !os.IsNotExist(err) { } else if err != nil && !os.IsNotExist(err) {
return err return err
} }
} }
// Determine target replica & generation to restore from.
r, generation, err := db.restoreTarget(ctx, opt, logger)
if err != nil {
return err
}
// Find lastest snapshot that occurs before timestamp. // Find lastest snapshot that occurs before timestamp.
minWALIndex, err := SnapshotIndexAt(ctx, r, generation, opt.Timestamp) minWALIndex, err := SnapshotIndexAt(ctx, r, opt.Generation, opt.Timestamp)
if err != nil { if err != nil {
return fmt.Errorf("cannot find snapshot index for restore: %w", err) return fmt.Errorf("cannot find snapshot index for restore: %w", err)
} }
// Find the maximum WAL index that occurs before timestamp. // Find the maximum WAL index that occurs before timestamp.
maxWALIndex, err := WALIndexAt(ctx, r, generation, opt.Index, opt.Timestamp) maxWALIndex, err := WALIndexAt(ctx, r, opt.Generation, opt.Index, opt.Timestamp)
if err != nil { if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err) return fmt.Errorf("cannot find max wal index for restore: %w", err)
} }
log.Printf("%s(%s): starting restore: generation %08x, index %08x-%08x", db.path, r.Name(), generation, minWALIndex, maxWALIndex) logger.Printf("%s: starting restore: generation %08x, index %08x-%08x", logPrefix, opt.Generation, minWALIndex, maxWALIndex)
// Initialize starting position. // Initialize starting position.
pos := Pos{Generation: generation, Index: minWALIndex} pos := Pos{Generation: opt.Generation, Index: minWALIndex}
tmpPath := outputPath + ".tmp" tmpPath := opt.OutputPath + ".tmp"
// Copy snapshot to output path. // Copy snapshot to output path.
if !opt.DryRun { if !opt.DryRun {
if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { if err := restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err) return fmt.Errorf("cannot restore snapshot: %w", err)
} }
} }
log.Printf("%s(%s): restoring snapshot %s/%08x to %s", db.path, r.Name(), generation, minWALIndex, tmpPath) logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
// Restore each WAL file until we reach our maximum index. // Restore each WAL file until we reach our maximum index.
for index := minWALIndex; index <= maxWALIndex; index++ { for index := minWALIndex; index <= maxWALIndex; index++ {
if !opt.DryRun { if !opt.DryRun {
if err = db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
log.Printf("%s(%s): no wal available, snapshot only", db.path, r.Name()) logger.Printf("%s: no wal available, snapshot only", logPrefix)
break // snapshot file only, ignore error break // snapshot file only, ignore error
} else if err != nil { } else if err != nil {
return fmt.Errorf("cannot restore wal: %w", err) return fmt.Errorf("cannot restore wal: %w", err)
} }
} }
log.Printf("%s(%s): restored wal %s/%08x", db.path, r.Name(), generation, index) logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
} }
// Copy file to final location. // Copy file to final location.
log.Printf("%s(%s): renaming database from temporary location", db.path, r.Name()) logger.Printf("%s: renaming database from temporary location", logPrefix)
if !opt.DryRun { if !opt.DryRun {
if err := os.Rename(tmpPath, outputPath); err != nil { if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
return err return err
} }
} }
@@ -1447,7 +1442,8 @@ func checksumFile(filename string) (uint64, error) {
return h.Sum64(), nil return h.Sum64(), nil
} }
func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) { // CalcRestoreTarget returns a replica & generation to restore from based on opt criteria.
func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (Replica, string, error) {
var target struct { var target struct {
replica Replica replica Replica
generation string generation string
@@ -1460,9 +1456,31 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
continue continue
} }
generation, stats, err := CalcReplicaRestoreTarget(ctx, r, opt)
if err != nil {
return nil, "", err
}
// Use the latest replica if we have multiple candidates.
if !stats.UpdatedAt.After(target.stats.UpdatedAt) {
continue
}
target.replica, target.generation, target.stats = r, generation, stats
}
return target.replica, target.generation, nil
}
// CalcReplicaRestoreTarget returns a generation to restore from.
func CalcReplicaRestoreTarget(ctx context.Context, r Replica, opt RestoreOptions) (generation string, stats GenerationStats, err error) {
var target struct {
generation string
stats GenerationStats
}
generations, err := r.Generations(ctx) generations, err := r.Generations(ctx)
if err != nil { if err != nil {
return nil, "", fmt.Errorf("cannot fetch generations: %w", err) return "", stats, fmt.Errorf("cannot fetch generations: %w", err)
} }
// Search generations for one that contains the requested timestamp. // Search generations for one that contains the requested timestamp.
@@ -1475,7 +1493,7 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
// Fetch stats for generation. // Fetch stats for generation.
stats, err := r.GenerationStats(ctx, generation) stats, err := r.GenerationStats(ctx, generation)
if err != nil { if err != nil {
return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err) return "", stats, fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err)
} }
// Skip if it does not contain timestamp. // Skip if it does not contain timestamp.
@@ -1490,27 +1508,28 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
continue continue
} }
target.replica = r
target.generation = generation target.generation = generation
target.stats = stats target.stats = stats
} }
}
// Return an error if no matching targets found. return target.generation, target.stats, nil
if target.generation == "" {
return nil, "", fmt.Errorf("no matching backups found")
}
return target.replica, target.generation, nil
} }
// restoreSnapshot copies a snapshot from the replica to a file. // restoreSnapshot copies a snapshot from the replica to a file.
func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error { func restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error {
if err := mkdirAll(filepath.Dir(filename), db.dirmode, db.diruid, db.dirgid); err != nil { // Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600)
diruid, dirgid, dirmode := -1, -1, os.FileMode(0700)
if db := r.DB(); db != nil {
uid, gid, mode = db.uid, db.gid, db.mode
diruid, dirgid, dirmode = db.diruid, db.dirgid, db.dirmode
}
if err := mkdirAll(filepath.Dir(filename), dirmode, diruid, dirgid); err != nil {
return err return err
} }
f, err := createFile(filename, db.uid, db.gid) f, err := createFile(filename, mode, uid, gid)
if err != nil { if err != nil {
return err return err
} }
@@ -1533,7 +1552,13 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string,
} }
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. // restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
// Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600)
if db := r.DB(); db != nil {
uid, gid, mode = db.uid, db.gid, db.mode
}
// Open WAL file from replica. // Open WAL file from replica.
rd, err := r.WALReader(ctx, generation, index) rd, err := r.WALReader(ctx, generation, index)
if err != nil { if err != nil {
@@ -1542,7 +1567,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
defer rd.Close() defer rd.Close()
// Open handle to destination WAL path. // Open handle to destination WAL path.
f, err := createFile(dbPath+"-wal", db.uid, db.gid) f, err := createFile(dbPath+"-wal", mode, uid, gid)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -257,8 +257,8 @@ func isHexChar(ch rune) bool {
} }
// createFile creates the file and attempts to set the UID/GID. // createFile creates the file and attempts to set the UID/GID.
func createFile(filename string, uid, gid int) (*os.File, error) { func createFile(filename string, perm os.FileMode, uid, gid int) (*os.File, error) {
f, err := os.Create(filename) f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, perm)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -125,10 +125,14 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica {
MonitorEnabled: true, MonitorEnabled: true,
} }
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.path, r.Name()) var dbPath string
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.path, r.Name()) if db != nil {
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.path, r.Name()) dbPath = db.Path()
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.path, r.Name()) }
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
return r return r
} }
@@ -923,6 +927,10 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int,
var index int var index int
for _, wal := range wals { for _, wal := range wals {
if wal.Generation != generation {
continue
}
if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) { if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) {
continue // after timestamp, skip continue // after timestamp, skip
} else if wal.Index > maxIndex { } else if wal.Index > maxIndex {
@@ -949,7 +957,12 @@ func compressFile(src, dst string, uid, gid int) error {
} }
defer r.Close() defer r.Close()
w, err := createFile(dst+".tmp", uid, gid) fi, err := r.Stat()
if err != nil {
return err
}
w, err := createFile(dst+".tmp", fi.Mode(), uid, gid)
if err != nil { if err != nil {
return err return err
} }
@@ -1000,7 +1013,7 @@ func ValidateReplica(ctx context.Context, r Replica) error {
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
restorePath := filepath.Join(tmpdir, "db") restorePath := filepath.Join(tmpdir, "db")
if err := db.Restore(ctx, RestoreOptions{ if err := RestoreReplica(ctx, r, RestoreOptions{
OutputPath: restorePath, OutputPath: restorePath,
ReplicaName: r.Name(), ReplicaName: r.Name(),
Generation: pos.Generation, Generation: pos.Generation,

View File

@@ -105,16 +105,20 @@ func NewReplica(db *litestream.DB, name string) *Replica {
MonitorEnabled: true, MonitorEnabled: true,
} }
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name()) var dbPath string
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name()) if db != nil {
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name()) dbPath = db.Path()
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name()) }
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST") r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE") r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "LIST")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "DELETE")
return r return r
} }