Compare commits

..

16 Commits

Author SHA1 Message Date
Ben Johnson
a178ef4714 Merge pull request #27 from benbjohnson/generations-replica-url
Allow replica URLs for generations command
2021-01-27 07:50:29 -07:00
Ben Johnson
7ca2e193b9 Allow replica URLs for generations command 2021-01-27 07:48:56 -07:00
Ben Johnson
39a6fabb9f Fix restore logging. 2021-01-26 17:01:00 -07:00
Ben Johnson
0249b4e4f5 Merge pull request #25 from benbjohnson/replica-url 2021-01-26 16:40:17 -07:00
Ben Johnson
67eeb49101 Allow replica URL to be used for commands
This commit refactors the commands to allow a replica URL when
restoring a database. If the first CLI arg is a URL with a scheme,
the it is treated as a replica URL.
2021-01-26 16:33:16 -07:00
Ben Johnson
f7213ed35c Allow replication without config file.
This commit changes `litestream replicate` to accept a database
path and a replica URL instead of using the config file. This allows
people to quickly try out the tool instead of learning the config
file syntax.
2021-01-25 10:33:50 -07:00
Ben Johnson
a532a0198e README 2021-01-24 10:09:54 -07:00
Ben Johnson
16f79e5814 Merge pull request #24 from benbjohnson/document-retention-period
Document retention period configuration
2021-01-24 09:32:31 -07:00
Ben Johnson
39aefc2c02 Document retention period configuration 2021-01-24 09:28:57 -07:00
Ben Johnson
0b08669bca Merge pull request #23 from benbjohnson/disable-metrics-by-default
Disable prometheus metrics by default
2021-01-24 09:18:37 -07:00
Ben Johnson
8f5761ee13 Disable prometheus metrics by default
The HTTP server should only be enabled if a user explicitly sets a
port for it.
2021-01-24 09:16:23 -07:00
Ben Johnson
d2eb4fa5ba Remove PR action 2021-01-24 08:54:29 -07:00
Ben Johnson
ca489c5e73 Merge pull request #22 from benbjohnson/notorize
Add signed homebrew install
2021-01-24 08:50:01 -07:00
Ben Johnson
f0ae48af4c Add signed homebrew install 2021-01-24 08:47:16 -07:00
Ben Johnson
9eae39e2fa README 2021-01-21 15:01:30 -07:00
Ben Johnson
42ab293ffb README 2021-01-21 14:53:21 -07:00
14 changed files with 619 additions and 328 deletions

View File

@@ -1,4 +1,4 @@
on: [push, pull_request]
on: push
name: test
jobs:
test:

View File

@@ -1,19 +1,22 @@
default:
dist:
dist-linux:
mkdir -p dist
cp etc/litestream.yml dist/litestream.yml
docker run --rm -v "${PWD}":/usr/src/litestream -w /usr/src/litestream -e GOOS=linux -e GOARCH=amd64 golang:1.15 go build -v -o dist/litestream ./cmd/litestream
tar -cz -f dist/litestream-linux-amd64.tar.gz -C dist litestream
deb: dist
dist-macos:
ifndef LITESTREAM_VERSION
$(error LITESTREAM_VERSION is undefined)
endif
cat etc/nfpm.yml | envsubst > dist/nfpm.yml
nfpm pkg --config dist/nfpm.yml --packager deb --target dist/litestream.deb
mkdir -p dist
go build -v -ldflags "-X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
gon etc/gon.hcl
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
clean:
rm -rf dist
.PHONY: deb dist clean
.PHONY: default dist-linux dist-macos clean

118
README.md
View File

@@ -1,4 +1,8 @@
Litestream ![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
Litestream
![GitHub release (latest by date)](https://img.shields.io/github/v/release/benbjohnson/litestream)
![Status](https://img.shields.io/badge/status-beta-blue)
![GitHub](https://img.shields.io/github/license/benbjohnson/litestream)
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
==========
Litestream is a standalone streaming replication tool for SQLite. It runs as a
@@ -6,15 +10,23 @@ background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database.
If you need support or have ideas for improving Litestream, please visit the
[GitHub Discussions](https://github.com/benbjohnson/litestream/discussions) to
chat.
If you find this project interesting, please consider starring the project on
GitHub.
## Installation
### Homebrew
### Mac OS (Homebrew)
TODO
To install from homebrew, first add the Litestream tap and then install:
```sh
$ brew install benbjohnson/litestream/litestream
```
### Linux (Debian)
@@ -37,14 +49,62 @@ $ sudo systemctl start litestream
### Release binaries
You can also download the release binary for your system from the
[Releases page][releases] and run it as a standalone application.
[releases page][releases] and run it as a standalone application.
### Building from source
Download and install the [Go toolchain](https://golang.org/) and then run:
```sh
$ go install ./cmd/litestream
```
The `litestream` binary should be in your `$GOPATH/bin` folder.
## Quick Start
Litestream provides a configuration file that can be used for production
deployments but you can also specify a single database and replica on the
command line for testing.
First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this
example. You'll also need to set your AWS credentials:
```sh
$ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
```
Next you can run the `litestream replicate` command with the path to the
database you want to backup and the URL of your replica destination:
```sh
$ litestream replicate /path/to/db s3://mybkt/db
```
If you make changes to your local database, those changes will be replicated
to S3 every 10 seconds. From another terminal window, you can restore your
database from your S3 replica:
```
$ litestream restore -v -o /path/to/restored/db s3://mybkt/db
```
Voila! 🎉
Your database should be restored to the last replicated state that
was sent to S3. You can adjust your replication frequency and other options by
using a configuration-based approach specified below.
## Configuration
Once installed locally, you'll need to create a config file. By default, the
config file lives at `/etc/litestream.yml` but you can pass in a different
path to any `litestream` command using the `-config PATH` flag.
A configuration-based install gives you more replication options. By default,
the config file lives at `/etc/litestream.yml` but you can pass in a different
path to any `litestream` command using the `-config PATH` flag. You can also
set the `LITESTREAM_CONFIG` environment variable to specify a new path.
The configuration specifies one or more `dbs` and a list of one or more replica
locations for each db. Below are some common configurations:
@@ -61,7 +121,7 @@ secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
dbs:
- path: /path/to/db
replicas:
- path: s3://mybkt/db
- url: s3://mybkt/db
```
### Replicate to another file path
@@ -76,6 +136,38 @@ dbs:
- path: /path/to/replica
```
### Retention period
By default, replicas will retain a snapshot & subsequent WAL changes for 24
hours. When the snapshot age exceeds the retention threshold, a new snapshot
is taken and uploaded and the previous snapshot and WAL files are removed.
You can configure this setting per-replica. Times are parsed using [Go's
duration](https://golang.org/pkg/time/#ParseDuration) so time units of hours
(`h`), minutes (`m`), and seconds (`s`) are allowed but days, weeks, months, and
years are not.
```yaml
db:
- path: /path/to/db
replicas:
- url: s3://mybkt/db
retention: 1h # 1 hour retention
```
### Monitoring replication
You can also enable a Prometheus metrics endpoint to monitor replication by
specifying a bind address with the `addr` field:
```yml
addr: ":9090"
```
This will make metrics available at: http://localhost:9090/metrics
### Other configuration options
@@ -83,8 +175,8 @@ These are some additional configuration options available on replicas:
- `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`.
- `name`—Specify an optional name for the replica if you are using multiple replicas.
- `path`—File path or URL to the replica location.
- `retention`—Length of time to keep replicated WAL files. Defaults to `24h`.
- `path`—File path to the replica location.
- `url`—URL to the replica location.
- `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`.
- `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default.
@@ -95,6 +187,7 @@ These replica options are only available for S3 replicas:
- `sync-interval`—Replication sync frequency.
## Usage
### Replication
@@ -137,7 +230,10 @@ to worry about accidentally overwriting your current database.
$ litestream restore /path/to/db
# Restore database to a new location.
$ litestream restore -o /tmp/mynewdb /path/to/db
$ litestream restore -o /path/to/restored/db /path/to/db
# Restore from a replica URL.
$ litestream restore -o /path/to/restored/db s3://mybkt/db
# Restore database to a specific point-in-time.
$ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db

View File

@@ -7,9 +7,10 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// GenerationsCommand represents a command to list all generations for a database.
@@ -25,50 +26,60 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
var db *litestream.DB
var r litestream.Replica
updatedAt := time.Now()
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Instantiate DB from from configuration.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
// Determine last time database or WAL was updated.
updatedAt, err := db.UpdatedAt()
if err != nil {
if updatedAt, err = db.UpdatedAt(); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
var replicas []litestream.Replica
if r != nil {
replicas = []litestream.Replica{r}
} else {
replicas = db.Replicas
}
// List each generation.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range db.Replicas {
if *replicaName != "" && r.Name() != *replicaName {
continue
}
for _, r := range replicas {
generations, err := r.Generations(ctx)
if err != nil {
log.Printf("%s: cannot list generations: %s", r.Name(), err)
@@ -101,17 +112,21 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
// Usage prints the help message to STDOUT.
func (c *GenerationsCommand) Usage() {
fmt.Printf(`
The generations command lists all generations for a database. It also lists
stats about their lag behind the primary database and the time range they cover.
The generations command lists all generations for a database or replica. It also
lists stats about their lag behind the primary database and the time range they
cover.
Usage:
litestream generations [arguments] DB
litestream generations [arguments] DB_PATH
litestream generations [arguments] REPLICA_URL
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
-replica NAME
Optional, filters by replica.

View File

@@ -87,21 +87,16 @@ Usage:
The commands are:
databases list databases specified in config file
generations list available generations for a database
replicate runs a server to replicate databases
restore recovers database backup from a replica
snapshots list available snapshots for a database
validate checks replica to ensure a consistent state with primary
version prints the version
version prints the binary version
wal list available WAL files for a database
`[1:])
}
// Default configuration settings.
const (
DefaultAddr = ":9090"
)
// Config represents a configuration file for the litestream daemon.
type Config struct {
// Bind address for serving metrics.
@@ -117,21 +112,9 @@ type Config struct {
Bucket string `yaml:"bucket"`
}
// Normalize expands paths and parses URL-specified replicas.
func (c *Config) Normalize() error {
for i := range c.DBs {
if err := c.DBs[i].Normalize(); err != nil {
return err
}
}
return nil
}
// DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config {
return Config{
Addr: DefaultAddr,
}
return Config{}
}
// DBConfig returns database configuration by path.
@@ -145,18 +128,13 @@ func (c *Config) DBConfig(path string) *DBConfig {
}
// ReadConfigFile unmarshals config from filename. Expands path if needed.
func ReadConfigFile(filename string) (Config, error) {
func ReadConfigFile(filename string) (_ Config, err error) {
config := DefaultConfig()
// Expand filename, if necessary.
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) {
u, err := user.Current()
filename, err = expand(filename)
if err != nil {
return config, err
} else if u.HomeDir == "" {
return config, fmt.Errorf("home directory unset")
}
filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix))
}
// Read & deserialize configuration.
@@ -168,9 +146,13 @@ func ReadConfigFile(filename string) (Config, error) {
return config, err
}
if err := config.Normalize(); err != nil {
// Normalize paths.
for _, dbConfig := range config.DBs {
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
return config, err
}
}
return config, nil
}
@@ -180,21 +162,12 @@ type DBConfig struct {
Replicas []*ReplicaConfig `yaml:"replicas"`
}
// Normalize expands paths and parses URL-specified replicas.
func (c *DBConfig) Normalize() error {
for i := range c.Replicas {
if err := c.Replicas[i].Normalize(); err != nil {
return err
}
}
return nil
}
// ReplicaConfig represents the configuration for a single replica in a database.
type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"`
URL string `yaml:"url"`
Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
@@ -207,47 +180,63 @@ type ReplicaConfig struct {
Bucket string `yaml:"bucket"`
}
// Normalize expands paths and parses URL-specified replicas.
func (c *ReplicaConfig) Normalize() error {
// Expand path filename, if necessary.
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(c.Path, prefix) {
u, err := user.Current()
// NewReplicaFromURL returns a new Replica instance configured from a URL.
// The replica's database is not set.
func NewReplicaFromURL(s string) (litestream.Replica, error) {
scheme, host, path, err := ParseReplicaURL(s)
if err != nil {
return err
} else if u.HomeDir == "" {
return fmt.Errorf("cannot expand replica path, no home directory available")
}
c.Path = filepath.Join(u.HomeDir, strings.TrimPrefix(c.Path, prefix))
return nil, err
}
// Attempt to parse as URL. Ignore if it is not a URL or if there is no scheme.
u, err := url.Parse(c.Path)
if err != nil || u.Scheme == "" {
return nil
switch scheme {
case "file":
return litestream.NewFileReplica(nil, "", path), nil
case "s3":
r := s3.NewReplica(nil, "")
r.Bucket, r.Path = host, path
return r, nil
default:
return nil, fmt.Errorf("invalid replica url type: %s", s)
}
}
// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
}
switch u.Scheme {
case "file":
u.Scheme = ""
c.Type = u.Scheme
c.Path = path.Clean(u.String())
return nil
scheme, u.Scheme = u.Scheme, ""
return scheme, "", path.Clean(u.String()), nil
case "s3":
c.Type = u.Scheme
c.Path = strings.TrimPrefix(path.Clean(u.Path), "/")
c.Bucket = u.Host
if u := u.User; u != nil {
c.AccessKeyID = u.Username()
c.SecretAccessKey, _ = u.Password()
}
return nil
case "":
return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
default:
return fmt.Errorf("unrecognized replica type in path scheme: %s", c.Path)
return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
}
}
// isURL returns true if s can be parsed and has a scheme.
func isURL(s string) bool {
u, err := url.Parse(s)
return err == nil && u.Scheme != ""
}
// ReplicaType returns the type based on the type field or extracted from the URL.
func (c *ReplicaConfig) ReplicaType() string {
typ, _, _, _ := ParseReplicaURL(c.URL)
if typ != "" {
return typ
} else if c.Type != "" {
return c.Type
}
return "file"
}
// DefaultConfigPath returns the default config path.
func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
@@ -262,8 +251,13 @@ func registerConfigFlag(fs *flag.FlagSet, p *string) {
// newDBFromConfig instantiates a DB based on a configuration.
func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
path, err := expand(dbc.Path)
if err != nil {
return nil, err
}
// Initialize database with given path.
db := litestream.NewDB(dbc.Path)
db := litestream.NewDB(path)
// Instantiate and attach replicas.
for _, rc := range dbc.Replicas {
@@ -279,8 +273,13 @@ func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
// newReplicaFromConfig instantiates a replica for a DB based on a config.
func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) {
switch rc.Type {
case "", "file":
// Ensure user did not specify URL in path.
if isURL(rc.Path) {
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path)
}
switch rc.ReplicaType() {
case "file":
return newFileReplicaFromConfig(db, c, dbc, rc)
case "s3":
return newS3ReplicaFromConfig(db, c, dbc, rc)
@@ -290,12 +289,24 @@ func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Repli
}
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*litestream.FileReplica, error) {
if rc.Path == "" {
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) {
path := rc.Path
if rc.URL != "" {
_, _, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
if path == "" {
return nil, fmt.Errorf("%s: file replica path required", db.Path())
}
r := litestream.NewFileReplica(db, rc.Name, rc.Path)
if path, err = expand(path); err != nil {
return nil, err
}
r := litestream.NewFileReplica(db, rc.Name, path)
if v := rc.Retention; v > 0 {
r.Retention = v
}
@@ -309,7 +320,20 @@ func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *R
}
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*s3.Replica, error) {
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) {
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
path := rc.Path
if rc.URL != "" {
_, bucket, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
// Use global or replica-specific S3 settings.
accessKeyID := c.AccessKeyID
if v := rc.AccessKeyID; v != "" {
@@ -319,21 +343,13 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
if v := rc.SecretAccessKey; v != "" {
secretAccessKey = v
}
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
region := c.Region
if v := rc.Region; v != "" {
region = v
}
// Ensure required settings are set.
if accessKeyID == "" {
return nil, fmt.Errorf("%s: s3 access key id required", db.Path())
} else if secretAccessKey == "" {
return nil, fmt.Errorf("%s: s3 secret access key required", db.Path())
} else if bucket == "" {
if bucket == "" {
return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
}
@@ -343,7 +359,7 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
r.SecretAccessKey = secretAccessKey
r.Region = region
r.Bucket = bucket
r.Path = rc.Path
r.Path = path
if v := rc.Retention; v > 0 {
r.Retention = v
@@ -359,3 +375,26 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
}
return r, nil
}
// expand returns an absolute path for s.
func expand(s string) (string, error) {
// Just expand to absolute path if there is no home directory prefix.
prefix := "~" + string(os.PathSeparator)
if s != "~" && !strings.HasPrefix(s, prefix) {
return filepath.Abs(s)
}
// Look up home directory.
u, err := user.Current()
if err != nil {
return "", err
} else if u.HomeDir == "" {
return "", fmt.Errorf("cannot expand path %s, no home directory available", s)
}
// Return path with tilde replaced by the home directory.
if s == "~" {
return u.HomeDir, nil
}
return filepath.Join(u.HomeDir, strings.TrimPrefix(s, prefix)), nil
}

View File

@@ -36,14 +36,24 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
return err
}
// Load configuration.
if c.ConfigPath == "" {
return errors.New("-config required")
// Load configuration or use CLI args to build db/replica.
var config Config
if fs.NArg() == 1 {
return fmt.Errorf("must specify at least one replica URL for %s", fs.Arg(0))
} else if fs.NArg() > 1 {
dbConfig := &DBConfig{Path: fs.Arg(0)}
for _, u := range fs.Args()[1:] {
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{URL: u})
}
config, err := ReadConfigFile(c.ConfigPath)
config.DBs = []*DBConfig{dbConfig}
} else if c.ConfigPath != "" {
config, err = ReadConfigFile(c.ConfigPath)
if err != nil {
return err
}
} else {
return errors.New("-config flag or database/replica arguments required")
}
// Enable trace logging.
if *verbose {
@@ -132,17 +142,22 @@ func (c *ReplicateCommand) Close() (err error) {
// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
The replicate command starts a server to monitor & replicate databases
specified in your configuration file.
The replicate command starts a server to monitor & replicate databases.
You can specify your database & replicas in a configuration file or you can
replicate a single database file by specifying its path and its replicas in the
command line arguments.
Usage:
litestream replicate [arguments]
litestream replicate [arguments] DB_PATH REPLICA_URL [REPLICA_URL...]
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
-v
Enable verbose logging output.

View File

@@ -7,7 +7,6 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/benbjohnson/litestream"
@@ -33,20 +32,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Parse timestamp, if specified.
if *timestampStr != "" {
if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
@@ -64,23 +54,72 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Determine replica & generation to restore from.
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = c.loadFromURL(ctx, fs.Arg(0), &opt); err != nil {
return err
}
} else if configPath != "" {
if r, err = c.loadFromConfig(ctx, fs.Arg(0), configPath, &opt); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Instantiate DB.
// Return an error if no matching targets found.
if opt.Generation == "" {
return fmt.Errorf("no matching backups found")
}
return litestream.RestoreReplica(ctx, r, opt)
}
// loadFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
r, err := NewReplicaFromURL(replicaURL)
if err != nil {
return nil, err
}
opt.Generation, _, err = litestream.CalcReplicaRestoreTarget(ctx, r, *opt)
return r, err
}
// loadFromConfig returns a replica & updates the restore options from a DB reference.
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return nil, err
}
// Lookup database from configuration file by path.
if dbPath, err = expand(dbPath); err != nil {
return nil, err
}
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
return nil, fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
return nil, err
}
return db.Restore(ctx, opt)
// Restore into original database path if not specified.
if opt.OutputPath == "" {
opt.OutputPath = dbPath
}
// Determine the appropriate replica & generation to restore from,
r, generation, err := db.CalcRestoreTarget(ctx, *opt)
if err != nil {
return nil, err
}
opt.Generation = generation
return r, nil
}
// Usage prints the help screen to STDOUT.
@@ -90,7 +129,9 @@ The restore command recovers a database from a previous snapshot and WAL.
Usage:
litestream restore [arguments] DB
litestream restore [arguments] DB_PATH
litestream restore [arguments] REPLICA_URL
Arguments:

View File

@@ -6,7 +6,6 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
@@ -31,37 +30,42 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Find snapshots by db or replica.
var infos []*litestream.SnapshotInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.Snapshots(ctx); err != nil {
if r != nil {
if infos, err = r.Snapshots(ctx); err != nil {
return err
}
} else {
@@ -90,11 +94,13 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
// Usage prints the help screen to STDOUT.
func (c *SnapshotsCommand) Usage() {
fmt.Printf(`
The snapshots command lists all snapshots available for a database.
The snapshots command lists all snapshots available for a database or replica.
Usage:
litestream snapshots [arguments] DB
litestream snapshots [arguments] DB_PATH
litestream snapshots [arguments] REPLICA_URL
Arguments:
@@ -105,7 +111,6 @@ Arguments:
-replica NAME
Optional, filter by a specific replica.
Examples:
# List all snapshots for a database.
@@ -114,6 +119,9 @@ Examples:
# List all snapshots on S3.
$ litestream snapshots -replica s3 /path/to/db
# List all snapshots by replica URL.
$ litestream snapshots s3://mybkt/db
`[1:],
DefaultConfigPath(),
)

View File

@@ -6,7 +6,6 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
@@ -32,37 +31,42 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
var infos []*litestream.WALInfo
// Filter by replica, if specified.
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.WALs(ctx); err != nil {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
} else {
return errors.New("config path or replica URL required")
}
// Find WAL files by db or replica.
var infos []*litestream.WALInfo
if r != nil {
if infos, err = r.WALs(ctx); err != nil {
return err
}
} else {
@@ -100,7 +104,9 @@ The wal command lists all wal files available for a database.
Usage:
litestream wal [arguments] DB
litestream wal [arguments] DB_PATH
litestream wal [arguments] REPLICA_URL
Arguments:
@@ -114,14 +120,16 @@ Arguments:
-generation NAME
Optional, filter by a specific generation.
Examples:
# List all WAL files for a database.
$ litestream wal /path/to/db
# List all WAL files on S3 for a specific generation.
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db
$ litestream wal -replica s3 -generation xxxxxxxx /path/to/db
# List all WAL files for replica URL.
$ litestream wal s3://mybkt/db
`[1:],
DefaultConfigPath(),

113
db.go
View File

@@ -622,7 +622,7 @@ func (db *DB) CurrentGeneration() (string, error) {
return "", err
}
// TODO: Verify if generation directory exists. If not, delete.
// TODO: Verify if generation directory exists. If not, delete name file.
generation := strings.TrimSpace(string(buf))
if len(generation) != GenerationNameLen {
@@ -1343,15 +1343,17 @@ func (db *DB) monitor() {
}
}
// Restore restores the database from a replica based on the options given.
// RestoreReplica restores the database from a replica based on the options given.
// This method will restore into opt.OutputPath, if specified, or into the
// DB's original database path. It can optionally restore from a specific
// replica or generation or it will automatically choose the best one. Finally,
// a timestamp can be specified to restore the database to a specific
// point-in-time.
func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error {
// Validate options.
if opt.Generation == "" && opt.Index != math.MaxInt64 {
if opt.OutputPath == "" {
return fmt.Errorf("output path required")
} else if opt.Generation == "" && opt.Index != math.MaxInt64 {
return fmt.Errorf("must specify generation when restoring to index")
} else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() {
return fmt.Errorf("cannot specify index & timestamp to restore")
@@ -1363,69 +1365,62 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
logger = log.New(ioutil.Discard, "", 0)
}
// Determine the correct output path.
outputPath := opt.OutputPath
if outputPath == "" {
outputPath = db.Path()
logPrefix := r.Name()
if db := r.DB(); db != nil {
logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name())
}
// Ensure output path does not already exist (unless this is a dry run).
if !opt.DryRun {
if _, err := os.Stat(outputPath); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", outputPath)
if _, err := os.Stat(opt.OutputPath); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
} else if err != nil && !os.IsNotExist(err) {
return err
}
}
// Determine target replica & generation to restore from.
r, generation, err := db.restoreTarget(ctx, opt, logger)
if err != nil {
return err
}
// Find lastest snapshot that occurs before timestamp.
minWALIndex, err := SnapshotIndexAt(ctx, r, generation, opt.Timestamp)
minWALIndex, err := SnapshotIndexAt(ctx, r, opt.Generation, opt.Timestamp)
if err != nil {
return fmt.Errorf("cannot find snapshot index for restore: %w", err)
}
// Find the maximum WAL index that occurs before timestamp.
maxWALIndex, err := WALIndexAt(ctx, r, generation, opt.Index, opt.Timestamp)
maxWALIndex, err := WALIndexAt(ctx, r, opt.Generation, opt.Index, opt.Timestamp)
if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err)
}
log.Printf("%s(%s): starting restore: generation %08x, index %08x-%08x", db.path, r.Name(), generation, minWALIndex, maxWALIndex)
logger.Printf("%s: starting restore: generation %s, index %08x-%08x", logPrefix, opt.Generation, minWALIndex, maxWALIndex)
// Initialize starting position.
pos := Pos{Generation: generation, Index: minWALIndex}
tmpPath := outputPath + ".tmp"
pos := Pos{Generation: opt.Generation, Index: minWALIndex}
tmpPath := opt.OutputPath + ".tmp"
// Copy snapshot to output path.
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
if !opt.DryRun {
if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
if err := restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err)
}
}
log.Printf("%s(%s): restoring snapshot %s/%08x to %s", db.path, r.Name(), generation, minWALIndex, tmpPath)
// Restore each WAL file until we reach our maximum index.
for index := minWALIndex; index <= maxWALIndex; index++ {
if !opt.DryRun {
if err = db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
log.Printf("%s(%s): no wal available, snapshot only", db.path, r.Name())
if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
logger.Printf("%s: no wal available, snapshot only", logPrefix)
break // snapshot file only, ignore error
} else if err != nil {
return fmt.Errorf("cannot restore wal: %w", err)
}
}
log.Printf("%s(%s): restored wal %s/%08x", db.path, r.Name(), generation, index)
logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
}
// Copy file to final location.
log.Printf("%s(%s): renaming database from temporary location", db.path, r.Name())
logger.Printf("%s: renaming database from temporary location", logPrefix)
if !opt.DryRun {
if err := os.Rename(tmpPath, outputPath); err != nil {
if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
return err
}
}
@@ -1447,7 +1442,8 @@ func checksumFile(filename string) (uint64, error) {
return h.Sum64(), nil
}
func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) {
// CalcRestoreTarget returns a replica & generation to restore from based on opt criteria.
func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (Replica, string, error) {
var target struct {
replica Replica
generation string
@@ -1460,9 +1456,31 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
continue
}
generation, stats, err := CalcReplicaRestoreTarget(ctx, r, opt)
if err != nil {
return nil, "", err
}
// Use the latest replica if we have multiple candidates.
if !stats.UpdatedAt.After(target.stats.UpdatedAt) {
continue
}
target.replica, target.generation, target.stats = r, generation, stats
}
return target.replica, target.generation, nil
}
// CalcReplicaRestoreTarget returns a generation to restore from.
func CalcReplicaRestoreTarget(ctx context.Context, r Replica, opt RestoreOptions) (generation string, stats GenerationStats, err error) {
var target struct {
generation string
stats GenerationStats
}
generations, err := r.Generations(ctx)
if err != nil {
return nil, "", fmt.Errorf("cannot fetch generations: %w", err)
return "", stats, fmt.Errorf("cannot fetch generations: %w", err)
}
// Search generations for one that contains the requested timestamp.
@@ -1475,7 +1493,7 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
// Fetch stats for generation.
stats, err := r.GenerationStats(ctx, generation)
if err != nil {
return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err)
return "", stats, fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err)
}
// Skip if it does not contain timestamp.
@@ -1490,27 +1508,28 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
continue
}
target.replica = r
target.generation = generation
target.stats = stats
}
}
// Return an error if no matching targets found.
if target.generation == "" {
return nil, "", fmt.Errorf("no matching backups found")
}
return target.replica, target.generation, nil
return target.generation, target.stats, nil
}
// restoreSnapshot copies a snapshot from the replica to a file.
func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error {
if err := mkdirAll(filepath.Dir(filename), db.dirmode, db.diruid, db.dirgid); err != nil {
func restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error {
// Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600)
diruid, dirgid, dirmode := -1, -1, os.FileMode(0700)
if db := r.DB(); db != nil {
uid, gid, mode = db.uid, db.gid, db.mode
diruid, dirgid, dirmode = db.diruid, db.dirgid, db.dirmode
}
if err := mkdirAll(filepath.Dir(filename), dirmode, diruid, dirgid); err != nil {
return err
}
f, err := createFile(filename, db.uid, db.gid)
f, err := createFile(filename, mode, uid, gid)
if err != nil {
return err
}
@@ -1533,7 +1552,13 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string,
}
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
// Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600)
if db := r.DB(); db != nil {
uid, gid, mode = db.uid, db.gid, db.mode
}
// Open WAL file from replica.
rd, err := r.WALReader(ctx, generation, index)
if err != nil {
@@ -1542,7 +1567,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
defer rd.Close()
// Open handle to destination WAL path.
f, err := createFile(dbPath+"-wal", db.uid, db.gid)
f, err := createFile(dbPath+"-wal", mode, uid, gid)
if err != nil {
return err
}

15
etc/gon.hcl Normal file
View File

@@ -0,0 +1,15 @@
source = ["./dist/litestream"]
bundle_id = "com.middlemost.litestream"
apple_id {
username = "benbjohnson@yahoo.com"
password = "@env:AC_PASSWORD"
}
sign {
application_identity = "Developer ID Application: Middlemost Systems, LLC"
}
zip {
output_path = "dist/litestream.zip"
}

View File

@@ -257,8 +257,8 @@ func isHexChar(ch rune) bool {
}
// createFile creates the file and attempts to set the UID/GID.
func createFile(filename string, uid, gid int) (*os.File, error) {
f, err := os.Create(filename)
func createFile(filename string, perm os.FileMode, uid, gid int) (*os.File, error) {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return nil, err
}

View File

@@ -125,10 +125,14 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica {
MonitorEnabled: true,
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.path, r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.path, r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.path, r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.path, r.Name())
var dbPath string
if db != nil {
dbPath = db.Path()
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
return r
}
@@ -923,6 +927,10 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int,
var index int
for _, wal := range wals {
if wal.Generation != generation {
continue
}
if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) {
continue // after timestamp, skip
} else if wal.Index > maxIndex {
@@ -949,7 +957,12 @@ func compressFile(src, dst string, uid, gid int) error {
}
defer r.Close()
w, err := createFile(dst+".tmp", uid, gid)
fi, err := r.Stat()
if err != nil {
return err
}
w, err := createFile(dst+".tmp", fi.Mode(), uid, gid)
if err != nil {
return err
}
@@ -1000,7 +1013,7 @@ func ValidateReplica(ctx context.Context, r Replica) error {
defer os.RemoveAll(tmpdir)
restorePath := filepath.Join(tmpdir, "db")
if err := db.Restore(ctx, RestoreOptions{
if err := RestoreReplica(ctx, r, RestoreOptions{
OutputPath: restorePath,
ReplicaName: r.Name(),
Generation: pos.Generation,

View File

@@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -104,16 +105,20 @@ func NewReplica(db *litestream.DB, name string) *Replica {
MonitorEnabled: true,
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name())
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE")
var dbPath string
if db != nil {
dbPath = db.Path()
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "LIST")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "DELETE")
return r
}
@@ -647,10 +652,9 @@ func (r *Replica) Init(ctx context.Context) (err error) {
}
// Create new AWS session.
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, ""),
Region: aws.String(region),
})
config := r.config()
config.Region = aws.String(region)
sess, err := session.NewSession(config)
if err != nil {
return fmt.Errorf("cannot create aws session: %w", err)
}
@@ -659,12 +663,21 @@ func (r *Replica) Init(ctx context.Context) (err error) {
return nil
}
// config returns the AWS configuration. Uses the default credential chain
// unless a key/secret are explicitly set.
func (r *Replica) config() *aws.Config {
config := defaults.Get().Config
if r.AccessKeyID != "" || r.SecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "")
}
return config
}
func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, error) {
// Connect to US standard region to fetch info.
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, ""),
Region: aws.String("us-east-1"),
})
config := r.config()
config.Region = aws.String("us-east-1")
sess, err := session.NewSession(config)
if err != nil {
return "", err
}