Compare commits

..

38 Commits

Author SHA1 Message Date
Ben Johnson
37442babfb Revert validation mismatch temp file persistence
This commit reverts 4e469f8 which was used for debugging the validation
stall corruption issue. It can cause the disk to fill with temporary
files though so it is being reverted.
2021-02-09 06:44:42 -07:00
Ben Johnson
962a2a894b Fix tabwriter 2021-02-08 15:55:15 -07:00
Ben Johnson
0c61c9f7fe Merge pull request #39 from benbjohnson/replicate-s3-sync-interval
Reduce s3 sync interval when using replica URL
2021-02-08 14:14:20 -07:00
Ben Johnson
267b140fab Reduce s3 sync interval when using replica URL
This commit changes the default sync interval from 10s to 1s
when replicating using the inline replica URL. This approach is
used when users are first testing the software so a faster
replication interval makes it easier to see results.
2021-02-08 14:09:01 -07:00
Ben Johnson
1b194535e6 Merge pull request #38 from benbjohnson/trace-flag
Add trace file to replicate command
2021-02-06 07:34:02 -07:00
Ben Johnson
58a6c765fe Add trace file to replicate command
This commit removes the verbose flag (`-v`) and replaces it with
the trace flag (`-trace PATH`). This moves tracing to a separate
file instead of writing to STDOUT.
2021-02-06 07:31:19 -07:00
Ben Johnson
2604052a9f Merge pull request #37 from benbjohnson/fix-shadow-write
Fix shadow wal corruption on stalled validation
2021-02-06 07:31:05 -07:00
Ben Johnson
7f81890bae Fix shadow wal corruption on stalled validation
This commit fixes a timing bug that occurs in a specific scenario
where the shadow wal sync stalls because of an s3 validation and
the catch up write to the shadow wal is large enough to allow a
window between WAL reads and the final copy.

The file copy has been replaced by direct writes of the frame
buffer to the shadow to ensure that every validated byte is exactly
what is being written to the shadow wal. The one downside to this
change is that the frame buffer will grow with the transaction
size so it will use additional heap. This can be replaced by a
spill-to-disk implementation but this should work well in the
short term.
2021-02-06 07:28:15 -07:00
Ben Johnson
2ff073c735 Merge pull request #36 from benbjohnson/max-index
Enforce max WAL index
2021-02-02 15:16:42 -07:00
Ben Johnson
6fd11ccab5 Enforce max WAL index.
This commit sets a hard upper limit for the WAL index to (1<<31)-1.
The index is hex-encoded in file names as a 4-byte unsigned integer
so limit ensures all index values are below any upper limit and are
unaffected by any signed int limit.

A WAL file is typically at least 4MB so you would need to write
8 petabytes to reach this upper limit.
2021-02-02 15:11:50 -07:00
Ben Johnson
6c49fba592 Check checkpoint result during restore 2021-02-02 15:04:20 -07:00
Ben Johnson
922fa0798e Merge pull request #34 from benbjohnson/windows 2021-01-31 10:09:51 -07:00
Ben Johnson
976df182c0 Fix Windows build 2021-01-31 10:08:28 -07:00
Ben Johnson
0e28a650e6 Merge pull request #33 from benbjohnson/log-wal-checksum-mismatch
Log WAL frame checksum mismatch
2021-01-31 08:57:51 -07:00
Ben Johnson
f17768e830 Log WAL frame checksum mismatch
Currently, the WAL copy function can encounter a checksum mismatch in a
WAL frame and it will return an error. This can occur for partial writes
and is recovered from moments later. This commit changes the error to a
log write instead.
2021-01-31 08:52:12 -07:00
Ben Johnson
2c142d3a0c Merge pull request #32 from benbjohnson/persist-mismatch-validation-data
Persist primary/replica copies after validation mismatch
2021-01-31 08:50:15 -07:00
Ben Johnson
4e469f8b02 Persist primary/replica copies after validation mismatch
This commit changes `ValidateReplica()` to persist copies of the
primary & replica databases for inspection if a validation mismatch
occurs.
2021-01-31 08:47:06 -07:00
Ben Johnson
3f268b70f8 Merge pull request #31 from benbjohnson/adjust-logging
Reduce logging output
2021-01-31 08:14:57 -07:00
Ben Johnson
ad7bf7f974 Reduce logging output
Previously, there were excessive log messages for checkpoints and
retention. These have been removed or combined into a single log
message where appropriate.
2021-01-31 08:12:18 -07:00
Ben Johnson
778451f09f CONTRIBUTING 2021-01-28 13:31:28 -07:00
Ben Johnson
8e9a15933b README 2021-01-27 08:01:48 -07:00
Ben Johnson
da1d7c3183 README 2021-01-27 07:59:12 -07:00
Ben Johnson
a178ef4714 Merge pull request #27 from benbjohnson/generations-replica-url
Allow replica URLs for generations command
2021-01-27 07:50:29 -07:00
Ben Johnson
7ca2e193b9 Allow replica URLs for generations command 2021-01-27 07:48:56 -07:00
Ben Johnson
39a6fabb9f Fix restore logging. 2021-01-26 17:01:00 -07:00
Ben Johnson
0249b4e4f5 Merge pull request #25 from benbjohnson/replica-url 2021-01-26 16:40:17 -07:00
Ben Johnson
67eeb49101 Allow replica URL to be used for commands
This commit refactors the commands to allow a replica URL when
restoring a database. If the first CLI arg is a URL with a scheme,
the it is treated as a replica URL.
2021-01-26 16:33:16 -07:00
Ben Johnson
f7213ed35c Allow replication without config file.
This commit changes `litestream replicate` to accept a database
path and a replica URL instead of using the config file. This allows
people to quickly try out the tool instead of learning the config
file syntax.
2021-01-25 10:33:50 -07:00
Ben Johnson
a532a0198e README 2021-01-24 10:09:54 -07:00
Ben Johnson
16f79e5814 Merge pull request #24 from benbjohnson/document-retention-period
Document retention period configuration
2021-01-24 09:32:31 -07:00
Ben Johnson
39aefc2c02 Document retention period configuration 2021-01-24 09:28:57 -07:00
Ben Johnson
0b08669bca Merge pull request #23 from benbjohnson/disable-metrics-by-default
Disable prometheus metrics by default
2021-01-24 09:18:37 -07:00
Ben Johnson
8f5761ee13 Disable prometheus metrics by default
The HTTP server should only be enabled if a user explicitly sets a
port for it.
2021-01-24 09:16:23 -07:00
Ben Johnson
d2eb4fa5ba Remove PR action 2021-01-24 08:54:29 -07:00
Ben Johnson
ca489c5e73 Merge pull request #22 from benbjohnson/notorize
Add signed homebrew install
2021-01-24 08:50:01 -07:00
Ben Johnson
f0ae48af4c Add signed homebrew install 2021-01-24 08:47:16 -07:00
Ben Johnson
9eae39e2fa README 2021-01-21 15:01:30 -07:00
Ben Johnson
42ab293ffb README 2021-01-21 14:53:21 -07:00
17 changed files with 869 additions and 429 deletions

17
.github/CONTRIBUTING.md vendored Normal file
View File

@@ -0,0 +1,17 @@
## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
source but closed to contributions. This keeps the code base free of proprietary
or licensed code but it also helps me continue to maintain and build Litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and
I eventually archived the project. Writing databases & low-level replication
tools involves nuance and simple one line changes can have profound and
unexpected changes in correctness and performance. Small contributions
typically required hours of my time to properly test and validate them.
I am grateful for community involvement, bug reports, & feature requests. I do
not wish to come off as anything but welcoming, however, I've
made the decision to keep this project closed to contributions for my own
mental health and long term viability of the project.

View File

@@ -1,4 +1,4 @@
on: [push, pull_request] on: push
name: test name: test
jobs: jobs:
test: test:

View File

@@ -1,19 +1,22 @@
default: default:
dist: dist-linux:
mkdir -p dist mkdir -p dist
cp etc/litestream.yml dist/litestream.yml cp etc/litestream.yml dist/litestream.yml
docker run --rm -v "${PWD}":/usr/src/litestream -w /usr/src/litestream -e GOOS=linux -e GOARCH=amd64 golang:1.15 go build -v -o dist/litestream ./cmd/litestream docker run --rm -v "${PWD}":/usr/src/litestream -w /usr/src/litestream -e GOOS=linux -e GOARCH=amd64 golang:1.15 go build -v -o dist/litestream ./cmd/litestream
tar -cz -f dist/litestream-linux-amd64.tar.gz -C dist litestream tar -cz -f dist/litestream-linux-amd64.tar.gz -C dist litestream
deb: dist dist-macos:
ifndef LITESTREAM_VERSION ifndef LITESTREAM_VERSION
$(error LITESTREAM_VERSION is undefined) $(error LITESTREAM_VERSION is undefined)
endif endif
cat etc/nfpm.yml | envsubst > dist/nfpm.yml mkdir -p dist
nfpm pkg --config dist/nfpm.yml --packager deb --target dist/litestream.deb go build -v -ldflags "-X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
gon etc/gon.hcl
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
clean: clean:
rm -rf dist rm -rf dist
.PHONY: deb dist clean .PHONY: default dist-linux dist-macos clean

124
README.md
View File

@@ -1,4 +1,8 @@
Litestream ![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg) Litestream
![GitHub release (latest by date)](https://img.shields.io/github/v/release/benbjohnson/litestream)
![Status](https://img.shields.io/badge/status-beta-blue)
![GitHub](https://img.shields.io/github/license/benbjohnson/litestream)
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
========== ==========
Litestream is a standalone streaming replication tool for SQLite. It runs as a Litestream is a standalone streaming replication tool for SQLite. It runs as a
@@ -6,15 +10,23 @@ background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database. will not corrupt your database.
If you need support or have ideas for improving Litestream, please visit the
[GitHub Discussions](https://github.com/benbjohnson/litestream/discussions) to
chat.
If you find this project interesting, please consider starring the project on If you find this project interesting, please consider starring the project on
GitHub. GitHub.
## Installation ## Installation
### Homebrew ### Mac OS (Homebrew)
TODO To install from homebrew, run the following command:
```sh
$ brew install benbjohnson/litestream/litestream
```
### Linux (Debian) ### Linux (Debian)
@@ -37,14 +49,62 @@ $ sudo systemctl start litestream
### Release binaries ### Release binaries
You can also download the release binary for your system from the You can also download the release binary for your system from the
[Releases page][releases] and run it as a standalone application. [releases page][releases] and run it as a standalone application.
### Building from source
Download and install the [Go toolchain](https://golang.org/) and then run:
```sh
$ go install ./cmd/litestream
```
The `litestream` binary should be in your `$GOPATH/bin` folder.
## Quick Start
Litestream provides a configuration file that can be used for production
deployments but you can also specify a single database and replica on the
command line when trying it out.
First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this
example. You'll also need to set your AWS credentials:
```sh
$ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
```
Next, you can run the `litestream replicate` command with the path to the
database you want to backup and the URL of your replica destination:
```sh
$ litestream replicate /path/to/db s3://mybkt/db
```
If you make changes to your local database, those changes will be replicated
to S3 every 10 seconds. From another terminal window, you can restore your
database from your S3 replica:
```
$ litestream restore -o /path/to/restored/db s3://mybkt/db
```
Voila! 🎉
Your database should be restored to the last replicated state that
was sent to S3. You can adjust your replication frequency and other options by
using a configuration-based approach specified below.
## Configuration ## Configuration
Once installed locally, you'll need to create a config file. By default, the A configuration-based install gives you more replication options. By default,
config file lives at `/etc/litestream.yml` but you can pass in a different the config file lives at `/etc/litestream.yml` but you can pass in a different
path to any `litestream` command using the `-config PATH` flag. path to any `litestream` command using the `-config PATH` flag. You can also
set the `LITESTREAM_CONFIG` environment variable to specify a new path.
The configuration specifies one or more `dbs` and a list of one or more replica The configuration specifies one or more `dbs` and a list of one or more replica
locations for each db. Below are some common configurations: locations for each db. Below are some common configurations:
@@ -61,7 +121,7 @@ secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
dbs: dbs:
- path: /path/to/db - path: /path/to/db
replicas: replicas:
- path: s3://mybkt/db - url: s3://mybkt/db
``` ```
### Replicate to another file path ### Replicate to another file path
@@ -76,6 +136,38 @@ dbs:
- path: /path/to/replica - path: /path/to/replica
``` ```
### Retention period
By default, replicas will retain a snapshot & subsequent WAL changes for 24
hours. When the snapshot age exceeds the retention threshold, a new snapshot
is taken and uploaded and the previous snapshot and WAL files are removed.
You can configure this setting per-replica. Times are parsed using [Go's
duration](https://golang.org/pkg/time/#ParseDuration) so time units of hours
(`h`), minutes (`m`), and seconds (`s`) are allowed but days, weeks, months, and
years are not.
```yaml
db:
- path: /path/to/db
replicas:
- url: s3://mybkt/db
retention: 1h # 1 hour retention
```
### Monitoring replication
You can also enable a Prometheus metrics endpoint to monitor replication by
specifying a bind address with the `addr` field:
```yml
addr: ":9090"
```
This will make metrics available at: http://localhost:9090/metrics
### Other configuration options ### Other configuration options
@@ -83,8 +175,8 @@ These are some additional configuration options available on replicas:
- `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`. - `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`.
- `name`—Specify an optional name for the replica if you are using multiple replicas. - `name`—Specify an optional name for the replica if you are using multiple replicas.
- `path`—File path or URL to the replica location. - `path`—File path to the replica location.
- `retention`—Length of time to keep replicated WAL files. Defaults to `24h`. - `url`—URL to the replica location.
- `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`. - `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`.
- `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default. - `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default.
@@ -95,6 +187,7 @@ These replica options are only available for S3 replicas:
- `sync-interval`—Replication sync frequency. - `sync-interval`—Replication sync frequency.
## Usage ## Usage
### Replication ### Replication
@@ -137,14 +230,17 @@ to worry about accidentally overwriting your current database.
$ litestream restore /path/to/db $ litestream restore /path/to/db
# Restore database to a new location. # Restore database to a new location.
$ litestream restore -o /tmp/mynewdb /path/to/db $ litestream restore -o /path/to/restored/db /path/to/db
# Restore from a replica URL.
$ litestream restore -o /path/to/restored/db s3://mybkt/db
# Restore database to a specific point-in-time. # Restore database to a specific point-in-time.
$ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db $ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db
``` ```
Point-in-time restores only have the resolution of the timestamp of the WAL file Point-in-time restores only have the resolution of the timestamp of the WAL file
itself. By default, litestream will start a new WAL file every minute so itself. By default, Litestream will start a new WAL file every minute so
point-in-time restores are only accurate to the minute. point-in-time restores are only accurate to the minute.
@@ -180,9 +276,9 @@ another process is allowed to checkpoint the WAL.
## Open-source, not open-contribution ## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), litestream is open [Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
source but closed to contributions. This keeps the code base free of proprietary source but closed to contributions. This keeps the code base free of proprietary
or licensed code but it also helps me continue to maintain and build litestream. or licensed code but it also helps me continue to maintain and build Litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and accepting and maintaining third party patches contributed to my burn out and

View File

@@ -35,7 +35,9 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
} }
// List all databases. // List all databases.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "path\treplicas") fmt.Fprintln(w, "path\treplicas")
for _, dbConfig := range config.DBs { for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(&config, dbConfig) db, err := newDBFromConfig(&config, dbConfig)
@@ -53,7 +55,6 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
strings.Join(replicaNames, ","), strings.Join(replicaNames, ","),
) )
} }
w.Flush()
return nil return nil
} }

View File

@@ -7,9 +7,10 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"path/filepath"
"text/tabwriter" "text/tabwriter"
"time" "time"
"github.com/benbjohnson/litestream"
) )
// GenerationsCommand represents a command to list all generations for a database. // GenerationsCommand represents a command to list all generations for a database.
@@ -25,50 +26,62 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
if err := fs.Parse(args); err != nil { if err := fs.Parse(args); err != nil {
return err return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" { } else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required") return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 { } else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments") return fmt.Errorf("too many arguments")
} }
// Load configuration. var db *litestream.DB
if configPath == "" { var r litestream.Replica
return errors.New("-config required") updatedAt := time.Now()
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
} }
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath) config, err := ReadConfigFile(configPath)
if err != nil { if err != nil {
return err return err
} }
// Determine absolute path for database. // Lookup database from configuration file by path.
dbPath, err := filepath.Abs(fs.Arg(0)) if path, err := expand(fs.Arg(0)); err != nil {
if err != nil { return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err return err
} }
// Instantiate DB from from configuration. // Filter by replica, if specified.
dbConfig := config.DBConfig(dbPath) if *replicaName != "" {
if dbConfig == nil { if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("database not found in config: %s", dbPath) return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
} }
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
} }
// Determine last time database or WAL was updated. // Determine last time database or WAL was updated.
updatedAt, err := db.UpdatedAt() if updatedAt, err = db.UpdatedAt(); err != nil {
if err != nil {
return err return err
} }
} else {
return errors.New("config path or replica URL required")
}
var replicas []litestream.Replica
if r != nil {
replicas = []litestream.Replica{r}
} else {
replicas = db.Replicas
}
// List each generation. // List each generation.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend") defer w.Flush()
for _, r := range db.Replicas {
if *replicaName != "" && r.Name() != *replicaName {
continue
}
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range replicas {
generations, err := r.Generations(ctx) generations, err := r.Generations(ctx)
if err != nil { if err != nil {
log.Printf("%s: cannot list generations: %s", r.Name(), err) log.Printf("%s: cannot list generations: %s", r.Name(), err)
@@ -90,10 +103,8 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
stats.CreatedAt.Format(time.RFC3339), stats.CreatedAt.Format(time.RFC3339),
stats.UpdatedAt.Format(time.RFC3339), stats.UpdatedAt.Format(time.RFC3339),
) )
w.Flush()
} }
} }
w.Flush()
return nil return nil
} }
@@ -101,17 +112,21 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
// Usage prints the help message to STDOUT. // Usage prints the help message to STDOUT.
func (c *GenerationsCommand) Usage() { func (c *GenerationsCommand) Usage() {
fmt.Printf(` fmt.Printf(`
The generations command lists all generations for a database. It also lists The generations command lists all generations for a database or replica. It also
stats about their lag behind the primary database and the time range they cover. lists stats about their lag behind the primary database and the time range they
cover.
Usage: Usage:
litestream generations [arguments] DB litestream generations [arguments] DB_PATH
litestream generations [arguments] REPLICA_URL
Arguments: Arguments:
-config PATH -config PATH
Specifies the configuration file. Defaults to %s Specifies the configuration file.
Defaults to %s
-replica NAME -replica NAME
Optional, filters by replica. Optional, filters by replica.

View File

@@ -87,21 +87,16 @@ Usage:
The commands are: The commands are:
databases list databases specified in config file
generations list available generations for a database generations list available generations for a database
replicate runs a server to replicate databases replicate runs a server to replicate databases
restore recovers database backup from a replica restore recovers database backup from a replica
snapshots list available snapshots for a database snapshots list available snapshots for a database
validate checks replica to ensure a consistent state with primary version prints the binary version
version prints the version
wal list available WAL files for a database wal list available WAL files for a database
`[1:]) `[1:])
} }
// Default configuration settings.
const (
DefaultAddr = ":9090"
)
// Config represents a configuration file for the litestream daemon. // Config represents a configuration file for the litestream daemon.
type Config struct { type Config struct {
// Bind address for serving metrics. // Bind address for serving metrics.
@@ -117,21 +112,9 @@ type Config struct {
Bucket string `yaml:"bucket"` Bucket string `yaml:"bucket"`
} }
// Normalize expands paths and parses URL-specified replicas.
func (c *Config) Normalize() error {
for i := range c.DBs {
if err := c.DBs[i].Normalize(); err != nil {
return err
}
}
return nil
}
// DefaultConfig returns a new instance of Config with defaults set. // DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config { func DefaultConfig() Config {
return Config{ return Config{}
Addr: DefaultAddr,
}
} }
// DBConfig returns database configuration by path. // DBConfig returns database configuration by path.
@@ -145,18 +128,13 @@ func (c *Config) DBConfig(path string) *DBConfig {
} }
// ReadConfigFile unmarshals config from filename. Expands path if needed. // ReadConfigFile unmarshals config from filename. Expands path if needed.
func ReadConfigFile(filename string) (Config, error) { func ReadConfigFile(filename string) (_ Config, err error) {
config := DefaultConfig() config := DefaultConfig()
// Expand filename, if necessary. // Expand filename, if necessary.
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) { filename, err = expand(filename)
u, err := user.Current()
if err != nil { if err != nil {
return config, err return config, err
} else if u.HomeDir == "" {
return config, fmt.Errorf("home directory unset")
}
filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix))
} }
// Read & deserialize configuration. // Read & deserialize configuration.
@@ -168,9 +146,13 @@ func ReadConfigFile(filename string) (Config, error) {
return config, err return config, err
} }
if err := config.Normalize(); err != nil { // Normalize paths.
for _, dbConfig := range config.DBs {
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
return config, err return config, err
} }
}
return config, nil return config, nil
} }
@@ -180,21 +162,12 @@ type DBConfig struct {
Replicas []*ReplicaConfig `yaml:"replicas"` Replicas []*ReplicaConfig `yaml:"replicas"`
} }
// Normalize expands paths and parses URL-specified replicas.
func (c *DBConfig) Normalize() error {
for i := range c.Replicas {
if err := c.Replicas[i].Normalize(); err != nil {
return err
}
}
return nil
}
// ReplicaConfig represents the configuration for a single replica in a database. // ReplicaConfig represents the configuration for a single replica in a database.
type ReplicaConfig struct { type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3" Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional. Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"` Path string `yaml:"path"`
URL string `yaml:"url"`
Retention time.Duration `yaml:"retention"` Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
@@ -207,47 +180,63 @@ type ReplicaConfig struct {
Bucket string `yaml:"bucket"` Bucket string `yaml:"bucket"`
} }
// Normalize expands paths and parses URL-specified replicas. // NewReplicaFromURL returns a new Replica instance configured from a URL.
func (c *ReplicaConfig) Normalize() error { // The replica's database is not set.
// Expand path filename, if necessary. func NewReplicaFromURL(s string) (litestream.Replica, error) {
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(c.Path, prefix) { scheme, host, path, err := ParseReplicaURL(s)
u, err := user.Current()
if err != nil { if err != nil {
return err return nil, err
} else if u.HomeDir == "" {
return fmt.Errorf("cannot expand replica path, no home directory available")
}
c.Path = filepath.Join(u.HomeDir, strings.TrimPrefix(c.Path, prefix))
} }
// Attempt to parse as URL. Ignore if it is not a URL or if there is no scheme. switch scheme {
u, err := url.Parse(c.Path) case "file":
if err != nil || u.Scheme == "" { return litestream.NewFileReplica(nil, "", path), nil
return nil case "s3":
r := s3.NewReplica(nil, "")
r.Bucket, r.Path = host, path
return r, nil
default:
return nil, fmt.Errorf("invalid replica url type: %s", s)
}
}
// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
} }
switch u.Scheme { switch u.Scheme {
case "file": case "file":
u.Scheme = "" scheme, u.Scheme = u.Scheme, ""
c.Type = u.Scheme return scheme, "", path.Clean(u.String()), nil
c.Path = path.Clean(u.String())
return nil
case "s3": case "":
c.Type = u.Scheme return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
c.Path = strings.TrimPrefix(path.Clean(u.Path), "/")
c.Bucket = u.Host
if u := u.User; u != nil {
c.AccessKeyID = u.Username()
c.SecretAccessKey, _ = u.Password()
}
return nil
default: default:
return fmt.Errorf("unrecognized replica type in path scheme: %s", c.Path) return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
} }
} }
// isURL returns true if s can be parsed and has a scheme.
func isURL(s string) bool {
u, err := url.Parse(s)
return err == nil && u.Scheme != ""
}
// ReplicaType returns the type based on the type field or extracted from the URL.
func (c *ReplicaConfig) ReplicaType() string {
typ, _, _, _ := ParseReplicaURL(c.URL)
if typ != "" {
return typ
} else if c.Type != "" {
return c.Type
}
return "file"
}
// DefaultConfigPath returns the default config path. // DefaultConfigPath returns the default config path.
func DefaultConfigPath() string { func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" { if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
@@ -262,8 +251,13 @@ func registerConfigFlag(fs *flag.FlagSet, p *string) {
// newDBFromConfig instantiates a DB based on a configuration. // newDBFromConfig instantiates a DB based on a configuration.
func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) { func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
path, err := expand(dbc.Path)
if err != nil {
return nil, err
}
// Initialize database with given path. // Initialize database with given path.
db := litestream.NewDB(dbc.Path) db := litestream.NewDB(path)
// Instantiate and attach replicas. // Instantiate and attach replicas.
for _, rc := range dbc.Replicas { for _, rc := range dbc.Replicas {
@@ -279,8 +273,13 @@ func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
// newReplicaFromConfig instantiates a replica for a DB based on a config. // newReplicaFromConfig instantiates a replica for a DB based on a config.
func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) { func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) {
switch rc.Type { // Ensure user did not specify URL in path.
case "", "file": if isURL(rc.Path) {
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path)
}
switch rc.ReplicaType() {
case "file":
return newFileReplicaFromConfig(db, c, dbc, rc) return newFileReplicaFromConfig(db, c, dbc, rc)
case "s3": case "s3":
return newS3ReplicaFromConfig(db, c, dbc, rc) return newS3ReplicaFromConfig(db, c, dbc, rc)
@@ -290,12 +289,24 @@ func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Repli
} }
// newFileReplicaFromConfig returns a new instance of FileReplica build from config. // newFileReplicaFromConfig returns a new instance of FileReplica build from config.
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*litestream.FileReplica, error) { func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) {
if rc.Path == "" { path := rc.Path
if rc.URL != "" {
_, _, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
if path == "" {
return nil, fmt.Errorf("%s: file replica path required", db.Path()) return nil, fmt.Errorf("%s: file replica path required", db.Path())
} }
r := litestream.NewFileReplica(db, rc.Name, rc.Path) if path, err = expand(path); err != nil {
return nil, err
}
r := litestream.NewFileReplica(db, rc.Name, path)
if v := rc.Retention; v > 0 { if v := rc.Retention; v > 0 {
r.Retention = v r.Retention = v
} }
@@ -309,7 +320,20 @@ func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *R
} }
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config. // newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*s3.Replica, error) { func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) {
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
path := rc.Path
if rc.URL != "" {
_, bucket, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
// Use global or replica-specific S3 settings. // Use global or replica-specific S3 settings.
accessKeyID := c.AccessKeyID accessKeyID := c.AccessKeyID
if v := rc.AccessKeyID; v != "" { if v := rc.AccessKeyID; v != "" {
@@ -319,21 +343,13 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
if v := rc.SecretAccessKey; v != "" { if v := rc.SecretAccessKey; v != "" {
secretAccessKey = v secretAccessKey = v
} }
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
region := c.Region region := c.Region
if v := rc.Region; v != "" { if v := rc.Region; v != "" {
region = v region = v
} }
// Ensure required settings are set. // Ensure required settings are set.
if accessKeyID == "" { if bucket == "" {
return nil, fmt.Errorf("%s: s3 access key id required", db.Path())
} else if secretAccessKey == "" {
return nil, fmt.Errorf("%s: s3 secret access key required", db.Path())
} else if bucket == "" {
return nil, fmt.Errorf("%s: s3 bucket required", db.Path()) return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
} }
@@ -343,7 +359,7 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
r.SecretAccessKey = secretAccessKey r.SecretAccessKey = secretAccessKey
r.Region = region r.Region = region
r.Bucket = bucket r.Bucket = bucket
r.Path = rc.Path r.Path = path
if v := rc.Retention; v > 0 { if v := rc.Retention; v > 0 {
r.Retention = v r.Retention = v
@@ -359,3 +375,26 @@ func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *Rep
} }
return r, nil return r, nil
} }
// expand returns an absolute path for s.
func expand(s string) (string, error) {
// Just expand to absolute path if there is no home directory prefix.
prefix := "~" + string(os.PathSeparator)
if s != "~" && !strings.HasPrefix(s, prefix) {
return filepath.Abs(s)
}
// Look up home directory.
u, err := user.Current()
if err != nil {
return "", err
} else if u.HomeDir == "" {
return "", fmt.Errorf("cannot expand path %s, no home directory available", s)
}
// Return path with tilde replaced by the home directory.
if s == "~" {
return u.HomeDir, nil
}
return filepath.Join(u.HomeDir, strings.TrimPrefix(s, prefix)), nil
}

View File

@@ -11,6 +11,7 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"time"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3" "github.com/benbjohnson/litestream/s3"
@@ -29,25 +30,43 @@ type ReplicateCommand struct {
// Run loads all databases specified in the configuration. // Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError) fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
verbose := fs.Bool("v", false, "verbose logging") tracePath := fs.String("trace", "", "trace path")
registerConfigFlag(fs, &c.ConfigPath) registerConfigFlag(fs, &c.ConfigPath)
fs.Usage = c.Usage fs.Usage = c.Usage
if err := fs.Parse(args); err != nil { if err := fs.Parse(args); err != nil {
return err return err
} }
// Load configuration. // Load configuration or use CLI args to build db/replica.
if c.ConfigPath == "" { var config Config
return errors.New("-config required") if fs.NArg() == 1 {
return fmt.Errorf("must specify at least one replica URL for %s", fs.Arg(0))
} else if fs.NArg() > 1 {
dbConfig := &DBConfig{Path: fs.Arg(0)}
for _, u := range fs.Args()[1:] {
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{
URL: u,
SyncInterval: 1 * time.Second,
})
} }
config, err := ReadConfigFile(c.ConfigPath) config.DBs = []*DBConfig{dbConfig}
} else if c.ConfigPath != "" {
config, err = ReadConfigFile(c.ConfigPath)
if err != nil { if err != nil {
return err return err
} }
} else {
return errors.New("-config flag or database/replica arguments required")
}
// Enable trace logging. // Enable trace logging.
if *verbose { if *tracePath != "" {
litestream.Tracef = log.Printf f, err := os.Create(*tracePath)
if err != nil {
return err
}
defer f.Close()
litestream.Tracef = log.New(f, "", log.LstdFlags|log.LUTC|log.Lshortfile).Printf
} }
// Setup signal handler. // Setup signal handler.
@@ -132,20 +151,25 @@ func (c *ReplicateCommand) Close() (err error) {
// Usage prints the help screen to STDOUT. // Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() { func (c *ReplicateCommand) Usage() {
fmt.Printf(` fmt.Printf(`
The replicate command starts a server to monitor & replicate databases The replicate command starts a server to monitor & replicate databases.
specified in your configuration file. You can specify your database & replicas in a configuration file or you can
replicate a single database file by specifying its path and its replicas in the
command line arguments.
Usage: Usage:
litestream replicate [arguments] litestream replicate [arguments]
litestream replicate [arguments] DB_PATH REPLICA_URL [REPLICA_URL...]
Arguments: Arguments:
-config PATH -config PATH
Specifies the configuration file. Defaults to %s Specifies the configuration file.
Defaults to %s
-v -trace PATH
Enable verbose logging output. Write verbose trace logging to PATH.
`[1:], DefaultConfigPath()) `[1:], DefaultConfigPath())
} }

View File

@@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"path/filepath"
"time" "time"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
@@ -20,6 +19,8 @@ type RestoreCommand struct{}
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string var configPath string
opt := litestream.NewRestoreOptions() opt := litestream.NewRestoreOptions()
opt.Verbose = true
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
registerConfigFlag(fs, &configPath) registerConfigFlag(fs, &configPath)
fs.StringVar(&opt.OutputPath, "o", "", "output path") fs.StringVar(&opt.OutputPath, "o", "", "output path")
@@ -33,20 +34,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
if err := fs.Parse(args); err != nil { if err := fs.Parse(args); err != nil {
return err return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" { } else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required") return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 { } else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments") return fmt.Errorf("too many arguments")
} }
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Parse timestamp, if specified. // Parse timestamp, if specified.
if *timestampStr != "" { if *timestampStr != "" {
if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil { if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
@@ -64,23 +56,72 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags) opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
} }
// Determine absolute path for database. // Determine replica & generation to restore from.
dbPath, err := filepath.Abs(fs.Arg(0)) var r litestream.Replica
if err != nil { if isURL(fs.Arg(0)) {
if r, err = c.loadFromURL(ctx, fs.Arg(0), &opt); err != nil {
return err return err
} }
} else if configPath != "" {
if r, err = c.loadFromConfig(ctx, fs.Arg(0), configPath, &opt); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Instantiate DB. // Return an error if no matching targets found.
if opt.Generation == "" {
return fmt.Errorf("no matching backups found")
}
return litestream.RestoreReplica(ctx, r, opt)
}
// loadFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
r, err := NewReplicaFromURL(replicaURL)
if err != nil {
return nil, err
}
opt.Generation, _, err = litestream.CalcReplicaRestoreTarget(ctx, r, *opt)
return r, err
}
// loadFromConfig returns a replica & updates the restore options from a DB reference.
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return nil, err
}
// Lookup database from configuration file by path.
if dbPath, err = expand(dbPath); err != nil {
return nil, err
}
dbConfig := config.DBConfig(dbPath) dbConfig := config.DBConfig(dbPath)
if dbConfig == nil { if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath) return nil, fmt.Errorf("database not found in config: %s", dbPath)
} }
db, err := newDBFromConfig(&config, dbConfig) db, err := newDBFromConfig(&config, dbConfig)
if err != nil { if err != nil {
return err return nil, err
} }
return db.Restore(ctx, opt) // Restore into original database path if not specified.
if opt.OutputPath == "" {
opt.OutputPath = dbPath
}
// Determine the appropriate replica & generation to restore from,
r, generation, err := db.CalcRestoreTarget(ctx, *opt)
if err != nil {
return nil, err
}
opt.Generation = generation
return r, nil
} }
// Usage prints the help screen to STDOUT. // Usage prints the help screen to STDOUT.
@@ -90,7 +131,9 @@ The restore command recovers a database from a previous snapshot and WAL.
Usage: Usage:
litestream restore [arguments] DB litestream restore [arguments] DB_PATH
litestream restore [arguments] REPLICA_URL
Arguments: Arguments:

View File

@@ -6,7 +6,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"path/filepath"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -31,37 +30,42 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments") return fmt.Errorf("too many arguments")
} }
// Load configuration. var db *litestream.DB
if configPath == "" { var r litestream.Replica
return errors.New("-config required") if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
} }
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath) config, err := ReadConfigFile(configPath)
if err != nil { if err != nil {
return err return err
} }
// Determine absolute path for database. // Lookup database from configuration file by path.
dbPath, err := filepath.Abs(fs.Arg(0)) if path, err := expand(fs.Arg(0)); err != nil {
if err != nil { return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err return err
} }
// Instantiate DB. // Filter by replica, if specified.
dbConfig := config.DBConfig(dbPath) if *replicaName != "" {
if dbConfig == nil { if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("database not found in config: %s", dbPath) return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
} }
db, err := newDBFromConfig(&config, dbConfig) }
if err != nil { } else {
return err return errors.New("config path or replica URL required")
} }
// Find snapshots by db or replica. // Find snapshots by db or replica.
var infos []*litestream.SnapshotInfo var infos []*litestream.SnapshotInfo
if *replicaName != "" { if r != nil {
if r := db.Replica(*replicaName); r == nil { if infos, err = r.Snapshots(ctx); err != nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.Snapshots(ctx); err != nil {
return err return err
} }
} else { } else {
@@ -71,7 +75,9 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
} }
// List all snapshots. // List all snapshots.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated") fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated")
for _, info := range infos { for _, info := range infos {
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n", fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n",
@@ -82,7 +88,6 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
info.CreatedAt.Format(time.RFC3339), info.CreatedAt.Format(time.RFC3339),
) )
} }
w.Flush()
return nil return nil
} }
@@ -90,11 +95,13 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
// Usage prints the help screen to STDOUT. // Usage prints the help screen to STDOUT.
func (c *SnapshotsCommand) Usage() { func (c *SnapshotsCommand) Usage() {
fmt.Printf(` fmt.Printf(`
The snapshots command lists all snapshots available for a database. The snapshots command lists all snapshots available for a database or replica.
Usage: Usage:
litestream snapshots [arguments] DB litestream snapshots [arguments] DB_PATH
litestream snapshots [arguments] REPLICA_URL
Arguments: Arguments:
@@ -105,7 +112,6 @@ Arguments:
-replica NAME -replica NAME
Optional, filter by a specific replica. Optional, filter by a specific replica.
Examples: Examples:
# List all snapshots for a database. # List all snapshots for a database.
@@ -114,6 +120,9 @@ Examples:
# List all snapshots on S3. # List all snapshots on S3.
$ litestream snapshots -replica s3 /path/to/db $ litestream snapshots -replica s3 /path/to/db
# List all snapshots by replica URL.
$ litestream snapshots s3://mybkt/db
`[1:], `[1:],
DefaultConfigPath(), DefaultConfigPath(),
) )

View File

@@ -6,7 +6,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"path/filepath"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -32,37 +31,42 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments") return fmt.Errorf("too many arguments")
} }
// Load configuration. var db *litestream.DB
if configPath == "" { var r litestream.Replica
return errors.New("-config required") if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
} }
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath) config, err := ReadConfigFile(configPath)
if err != nil { if err != nil {
return err return err
} }
// Determine absolute path for database. // Lookup database from configuration file by path.
dbPath, err := filepath.Abs(fs.Arg(0)) if path, err := expand(fs.Arg(0)); err != nil {
if err != nil { return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err return err
} }
// Instantiate DB. // Filter by replica, if specified.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
var infos []*litestream.WALInfo
if *replicaName != "" { if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil { if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath) return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
} else if infos, err = r.WALs(ctx); err != nil { }
}
} else {
return errors.New("config path or replica URL required")
}
// Find WAL files by db or replica.
var infos []*litestream.WALInfo
if r != nil {
if infos, err = r.WALs(ctx); err != nil {
return err return err
} }
} else { } else {
@@ -72,7 +76,9 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
} }
// List all WAL files. // List all WAL files.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "replica\tgeneration\tindex\toffset\tsize\tcreated") fmt.Fprintln(w, "replica\tgeneration\tindex\toffset\tsize\tcreated")
for _, info := range infos { for _, info := range infos {
if *generation != "" && info.Generation != *generation { if *generation != "" && info.Generation != *generation {
@@ -88,7 +94,6 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
info.CreatedAt.Format(time.RFC3339), info.CreatedAt.Format(time.RFC3339),
) )
} }
w.Flush()
return nil return nil
} }
@@ -100,7 +105,9 @@ The wal command lists all wal files available for a database.
Usage: Usage:
litestream wal [arguments] DB litestream wal [arguments] DB_PATH
litestream wal [arguments] REPLICA_URL
Arguments: Arguments:
@@ -114,14 +121,16 @@ Arguments:
-generation NAME -generation NAME
Optional, filter by a specific generation. Optional, filter by a specific generation.
Examples: Examples:
# List all WAL files for a database. # List all WAL files for a database.
$ litestream wal /path/to/db $ litestream wal /path/to/db
# List all WAL files on S3 for a specific generation. # List all WAL files on S3 for a specific generation.
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db $ litestream wal -replica s3 -generation xxxxxxxx /path/to/db
# List all WAL files for replica URL.
$ litestream wal s3://mybkt/db
`[1:], `[1:],
DefaultConfigPath(), DefaultConfigPath(),

239
db.go
View File

@@ -33,6 +33,10 @@ const (
DefaultMaxCheckpointPageN = 10000 DefaultMaxCheckpointPageN = 10000
) )
// MaxIndex is the maximum possible WAL index.
// If this index is reached then a new generation will be started.
const MaxIndex = 0x7FFFFFFF
// BusyTimeout is the timeout to wait for EBUSY from SQLite. // BusyTimeout is the timeout to wait for EBUSY from SQLite.
const BusyTimeout = 1 * time.Second const BusyTimeout = 1 * time.Second
@@ -421,7 +425,7 @@ func (db *DB) init() (err error) {
// If we have an existing shadow WAL, ensure the headers match. // If we have an existing shadow WAL, ensure the headers match.
if err := db.verifyHeadersMatch(); err != nil { if err := db.verifyHeadersMatch(); err != nil {
log.Printf("%s: cannot determine last wal position, clearing generation (%s)", db.path, err) log.Printf("%s: init: cannot determine last wal position, clearing generation (%s)", db.path, err)
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) { if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove generation name: %w", err) return fmt.Errorf("remove generation name: %w", err)
} }
@@ -622,7 +626,7 @@ func (db *DB) CurrentGeneration() (string, error) {
return "", err return "", err
} }
// TODO: Verify if generation directory exists. If not, delete. // TODO: Verify if generation directory exists. If not, delete name file.
generation := strings.TrimSpace(string(buf)) generation := strings.TrimSpace(string(buf))
if len(generation) != GenerationNameLen { if len(generation) != GenerationNameLen {
@@ -736,7 +740,7 @@ func (db *DB) Sync() (err error) {
if info.generation, err = db.createGeneration(); err != nil { if info.generation, err = db.createGeneration(); err != nil {
return fmt.Errorf("create generation: %w", err) return fmt.Errorf("create generation: %w", err)
} }
log.Printf("%s: new generation %q, %s", db.path, info.generation, info.reason) log.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
// Clear shadow wal info. // Clear shadow wal info.
info.shadowWALPath = db.ShadowWALPath(info.generation, 0) info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
@@ -844,10 +848,14 @@ func (db *DB) verify() (info syncInfo, err error) {
db.walSizeGauge.Set(float64(fi.Size())) db.walSizeGauge.Set(float64(fi.Size()))
// Open shadow WAL to copy append to. // Open shadow WAL to copy append to.
info.shadowWALPath, err = db.CurrentShadowWALPath(info.generation) index, _, err := db.CurrentShadowWALIndex(info.generation)
if err != nil { if err != nil {
return info, fmt.Errorf("cannot determine shadow WAL: %w", err) return info, fmt.Errorf("cannot determine shadow WAL index: %w", err)
} else if index >= MaxIndex {
info.reason = "max index exceeded"
return info, nil
} }
info.shadowWALPath = db.ShadowWALPath(generation, index)
// Determine shadow WAL current size. // Determine shadow WAL current size.
fi, err = os.Stat(info.shadowWALPath) fi, err = os.Stat(info.shadowWALPath)
@@ -1017,65 +1025,65 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
return 0, fmt.Errorf("last checksum: %w", err) return 0, fmt.Errorf("last checksum: %w", err)
} }
// Seek to correct position on both files. // Seek to correct position on real wal.
if _, err := r.Seek(origSize, io.SeekStart); err != nil { if _, err := r.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("wal seek: %w", err) return 0, fmt.Errorf("real wal seek: %w", err)
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("shadow wal seek: %w", err)
} }
// Read through WAL from last position to find the page of the last // Read through WAL from last position to find the page of the last
// committed transaction. // committed transaction.
tmpSz := origSize frame := make([]byte, db.pageSize+WALFrameHeaderSize)
var buf bytes.Buffer
offset := origSize
lastCommitSize := origSize lastCommitSize := origSize
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
for { for {
Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz)
// Read next page from WAL file. // Read next page from WAL file.
if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF { if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
Tracef("%s: copy-shadow: break %s", db.path, err) Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err)
break // end of file or partial page break // end of file or partial page
} else if err != nil { } else if err != nil {
return 0, fmt.Errorf("read wal: %w", err) return 0, fmt.Errorf("read wal: %w", err)
} }
// Read frame salt & compare to header salt. Stop reading on mismatch. // Read frame salt & compare to header salt. Stop reading on mismatch.
salt0 := binary.BigEndian.Uint32(buf[8:]) salt0 := binary.BigEndian.Uint32(frame[8:])
salt1 := binary.BigEndian.Uint32(buf[12:]) salt1 := binary.BigEndian.Uint32(frame[12:])
if salt0 != hsalt0 || salt1 != hsalt1 { if salt0 != hsalt0 || salt1 != hsalt1 {
Tracef("%s: copy-shadow: break: salt mismatch", db.path) Tracef("%s: copy-shadow: break: salt mismatch", db.path)
break break
} }
// Verify checksum of page is valid. // Verify checksum of page is valid.
fchksum0 := binary.BigEndian.Uint32(buf[16:]) fchksum0 := binary.BigEndian.Uint32(frame[16:])
fchksum1 := binary.BigEndian.Uint32(buf[20:]) fchksum1 := binary.BigEndian.Uint32(frame[20:])
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
if chksum0 != fchksum0 || chksum1 != fchksum1 { if chksum0 != fchksum0 || chksum1 != fchksum1 {
return 0, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1) log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", offset, chksum0, chksum1, fchksum0, fchksum1)
break
} }
// Add page to the new size of the shadow WAL. // Add page to the new size of the shadow WAL.
tmpSz += int64(len(buf)) buf.Write(frame)
// Mark commit record. Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
newDBSize := binary.BigEndian.Uint32(buf[4:]) offset += int64(len(frame))
// Flush to shadow WAL if commit record.
newDBSize := binary.BigEndian.Uint32(frame[4:])
if newDBSize != 0 { if newDBSize != 0 {
lastCommitSize = tmpSz if _, err := buf.WriteTo(w); err != nil {
return 0, fmt.Errorf("write shadow wal: %w", err)
}
buf.Reset()
lastCommitSize = offset
} }
} }
// Seek to correct position on both files. // Sync & close.
if _, err := r.Seek(origSize, io.SeekStart); err != nil { if err := w.Sync(); err != nil {
return 0, fmt.Errorf("wal seek: %w", err)
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("shadow wal seek: %w", err)
}
// Copy bytes, sync & close.
if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil {
return 0, err
} else if err := w.Sync(); err != nil {
return 0, err return 0, err
} else if err := w.Close(); err != nil { } else if err := w.Close(); err != nil {
return 0, err return 0, err
@@ -1098,6 +1106,8 @@ func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
return nil, err return nil, err
} else if r.N() > 0 { } else if r.N() > 0 {
return r, nil return r, nil
} else if err := r.Close(); err != nil { // no data, close, try next
return nil, err
} }
// Otherwise attempt to read the start of the next WAL file. // Otherwise attempt to read the start of the next WAL file.
@@ -1171,6 +1181,9 @@ type ShadowWALReader struct {
pos Pos pos Pos
} }
// Name returns the filename of the underlying file.
func (r *ShadowWALReader) Name() string { return r.f.Name() }
// Close closes the underlying WAL file handle. // Close closes the underlying WAL file handle.
func (r *ShadowWALReader) Close() error { return r.f.Close() } func (r *ShadowWALReader) Close() error { return r.f.Close() }
@@ -1264,7 +1277,7 @@ func (db *DB) checkpoint(mode string) (err error) {
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
return err return err
} }
log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
// Reacquire the read lock immediately after the checkpoint. // Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil { if err := db.acquireReadLock(); err != nil {
@@ -1288,6 +1301,11 @@ func (db *DB) checkpointAndInit(generation, mode string) error {
return err return err
} }
// Copy shadow WAL before checkpoint to copy as much as possible.
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
}
// Execute checkpoint and immediately issue a write to the WAL to ensure // Execute checkpoint and immediately issue a write to the WAL to ensure
// a new page is written. // a new page is written.
if err := db.checkpoint(mode); err != nil { if err := db.checkpoint(mode); err != nil {
@@ -1343,15 +1361,17 @@ func (db *DB) monitor() {
} }
} }
// Restore restores the database from a replica based on the options given. // RestoreReplica restores the database from a replica based on the options given.
// This method will restore into opt.OutputPath, if specified, or into the // This method will restore into opt.OutputPath, if specified, or into the
// DB's original database path. It can optionally restore from a specific // DB's original database path. It can optionally restore from a specific
// replica or generation or it will automatically choose the best one. Finally, // replica or generation or it will automatically choose the best one. Finally,
// a timestamp can be specified to restore the database to a specific // a timestamp can be specified to restore the database to a specific
// point-in-time. // point-in-time.
func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error {
// Validate options. // Validate options.
if opt.Generation == "" && opt.Index != math.MaxInt64 { if opt.OutputPath == "" {
return fmt.Errorf("output path required")
} else if opt.Generation == "" && opt.Index != math.MaxInt64 {
return fmt.Errorf("must specify generation when restoring to index") return fmt.Errorf("must specify generation when restoring to index")
} else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() { } else if opt.Index != math.MaxInt64 && !opt.Timestamp.IsZero() {
return fmt.Errorf("cannot specify index & timestamp to restore") return fmt.Errorf("cannot specify index & timestamp to restore")
@@ -1363,69 +1383,65 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
logger = log.New(ioutil.Discard, "", 0) logger = log.New(ioutil.Discard, "", 0)
} }
// Determine the correct output path. logPrefix := r.Name()
outputPath := opt.OutputPath if db := r.DB(); db != nil {
if outputPath == "" { logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name())
outputPath = db.Path()
} }
// Ensure output path does not already exist (unless this is a dry run). // Ensure output path does not already exist (unless this is a dry run).
if !opt.DryRun { if !opt.DryRun {
if _, err := os.Stat(outputPath); err == nil { if _, err := os.Stat(opt.OutputPath); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", outputPath) return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
} else if err != nil && !os.IsNotExist(err) { } else if err != nil && !os.IsNotExist(err) {
return err return err
} }
} }
// Determine target replica & generation to restore from.
r, generation, err := db.restoreTarget(ctx, opt, logger)
if err != nil {
return err
}
// Find lastest snapshot that occurs before timestamp. // Find lastest snapshot that occurs before timestamp.
minWALIndex, err := SnapshotIndexAt(ctx, r, generation, opt.Timestamp) minWALIndex, err := SnapshotIndexAt(ctx, r, opt.Generation, opt.Timestamp)
if err != nil { if err != nil {
return fmt.Errorf("cannot find snapshot index for restore: %w", err) return fmt.Errorf("cannot find snapshot index for restore: %w", err)
} }
// Find the maximum WAL index that occurs before timestamp. // Find the maximum WAL index that occurs before timestamp.
maxWALIndex, err := WALIndexAt(ctx, r, generation, opt.Index, opt.Timestamp) maxWALIndex, err := WALIndexAt(ctx, r, opt.Generation, opt.Index, opt.Timestamp)
if err != nil { if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err) return fmt.Errorf("cannot find max wal index for restore: %w", err)
} }
log.Printf("%s(%s): starting restore: generation %08x, index %08x-%08x", db.path, r.Name(), generation, minWALIndex, maxWALIndex) logger.Printf("%s: starting restore: generation %s, index %08x-%08x", logPrefix, opt.Generation, minWALIndex, maxWALIndex)
// Initialize starting position. // Initialize starting position.
pos := Pos{Generation: generation, Index: minWALIndex} pos := Pos{Generation: opt.Generation, Index: minWALIndex}
tmpPath := outputPath + ".tmp" tmpPath := opt.OutputPath + ".tmp"
// Copy snapshot to output path. // Copy snapshot to output path.
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
if !opt.DryRun { if !opt.DryRun {
if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { if err := restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err) return fmt.Errorf("cannot restore snapshot: %w", err)
} }
} }
log.Printf("%s(%s): restoring snapshot %s/%08x to %s", db.path, r.Name(), generation, minWALIndex, tmpPath)
// Restore each WAL file until we reach our maximum index. // Restore each WAL file until we reach our maximum index.
for index := minWALIndex; index <= maxWALIndex; index++ { for index := minWALIndex; index <= maxWALIndex; index++ {
if !opt.DryRun { if !opt.DryRun {
if err = db.restoreWAL(ctx, r, generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
log.Printf("%s(%s): no wal available, snapshot only", db.path, r.Name()) logger.Printf("%s: no wal available, snapshot only", logPrefix)
break // snapshot file only, ignore error break // snapshot file only, ignore error
} else if err != nil { } else if err != nil {
return fmt.Errorf("cannot restore wal: %w", err) return fmt.Errorf("cannot restore wal: %w", err)
} }
} }
log.Printf("%s(%s): restored wal %s/%08x", db.path, r.Name(), generation, index)
if opt.Verbose {
logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
}
} }
// Copy file to final location. // Copy file to final location.
log.Printf("%s(%s): renaming database from temporary location", db.path, r.Name()) logger.Printf("%s: renaming database from temporary location", logPrefix)
if !opt.DryRun { if !opt.DryRun {
if err := os.Rename(tmpPath, outputPath); err != nil { if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
return err return err
} }
} }
@@ -1447,7 +1463,8 @@ func checksumFile(filename string) (uint64, error) {
return h.Sum64(), nil return h.Sum64(), nil
} }
func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) { // CalcRestoreTarget returns a replica & generation to restore from based on opt criteria.
func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (Replica, string, error) {
var target struct { var target struct {
replica Replica replica Replica
generation string generation string
@@ -1460,9 +1477,31 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
continue continue
} }
generation, stats, err := CalcReplicaRestoreTarget(ctx, r, opt)
if err != nil {
return nil, "", err
}
// Use the latest replica if we have multiple candidates.
if !stats.UpdatedAt.After(target.stats.UpdatedAt) {
continue
}
target.replica, target.generation, target.stats = r, generation, stats
}
return target.replica, target.generation, nil
}
// CalcReplicaRestoreTarget returns a generation to restore from.
func CalcReplicaRestoreTarget(ctx context.Context, r Replica, opt RestoreOptions) (generation string, stats GenerationStats, err error) {
var target struct {
generation string
stats GenerationStats
}
generations, err := r.Generations(ctx) generations, err := r.Generations(ctx)
if err != nil { if err != nil {
return nil, "", fmt.Errorf("cannot fetch generations: %w", err) return "", stats, fmt.Errorf("cannot fetch generations: %w", err)
} }
// Search generations for one that contains the requested timestamp. // Search generations for one that contains the requested timestamp.
@@ -1475,7 +1514,7 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
// Fetch stats for generation. // Fetch stats for generation.
stats, err := r.GenerationStats(ctx, generation) stats, err := r.GenerationStats(ctx, generation)
if err != nil { if err != nil {
return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err) return "", stats, fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err)
} }
// Skip if it does not contain timestamp. // Skip if it does not contain timestamp.
@@ -1490,27 +1529,28 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log
continue continue
} }
target.replica = r
target.generation = generation target.generation = generation
target.stats = stats target.stats = stats
} }
}
// Return an error if no matching targets found. return target.generation, target.stats, nil
if target.generation == "" {
return nil, "", fmt.Errorf("no matching backups found")
}
return target.replica, target.generation, nil
} }
// restoreSnapshot copies a snapshot from the replica to a file. // restoreSnapshot copies a snapshot from the replica to a file.
func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error { func restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error {
if err := mkdirAll(filepath.Dir(filename), db.dirmode, db.diruid, db.dirgid); err != nil { // Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600)
diruid, dirgid, dirmode := -1, -1, os.FileMode(0700)
if db := r.DB(); db != nil {
uid, gid, mode = db.uid, db.gid, db.mode
diruid, dirgid, dirmode = db.diruid, db.dirgid, db.dirmode
}
if err := mkdirAll(filepath.Dir(filename), dirmode, diruid, dirgid); err != nil {
return err return err
} }
f, err := createFile(filename, db.uid, db.gid) f, err := createFile(filename, mode, uid, gid)
if err != nil { if err != nil {
return err return err
} }
@@ -1533,7 +1573,13 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string,
} }
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. // restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
// Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600)
if db := r.DB(); db != nil {
uid, gid, mode = db.uid, db.gid, db.mode
}
// Open WAL file from replica. // Open WAL file from replica.
rd, err := r.WALReader(ctx, generation, index) rd, err := r.WALReader(ctx, generation, index)
if err != nil { if err != nil {
@@ -1542,7 +1588,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
defer rd.Close() defer rd.Close()
// Open handle to destination WAL path. // Open handle to destination WAL path.
f, err := createFile(dbPath+"-wal", db.uid, db.gid) f, err := createFile(dbPath+"-wal", mode, uid, gid)
if err != nil { if err != nil {
return err return err
} }
@@ -1562,13 +1608,14 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
} }
defer d.Close() defer d.Close()
if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil { var row [3]int
return err if err := d.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE);`).Scan(&row[0], &row[1], &row[2]); err != nil {
} else if err := d.Close(); err != nil {
return err return err
} else if row[0] != 0 {
return fmt.Errorf("truncation checkpoint failed during restore (%d,%d,%d)", row[0], row[1], row[2])
} }
return nil return d.Close()
} }
// CRC64 returns a CRC-64 ISO checksum of the database and its current position. // CRC64 returns a CRC-64 ISO checksum of the database and its current position.
@@ -1576,6 +1623,8 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
// This function obtains a read lock so it prevents syncs from occurring until // This function obtains a read lock so it prevents syncs from occurring until
// the operation is complete. The database will still be usable but it will be // the operation is complete. The database will still be usable but it will be
// unable to checkpoint during this time. // unable to checkpoint during this time.
//
// If dst is set, the database file is copied to that location before checksum.
func (db *DB) CRC64() (uint64, Pos, error) { func (db *DB) CRC64() (uint64, Pos, error) {
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
@@ -1639,8 +1688,9 @@ type RestoreOptions struct {
// Only equivalent log output for a regular restore. // Only equivalent log output for a regular restore.
DryRun bool DryRun bool
// Logger used to print status to. // Logging settings.
Logger *log.Logger Logger *log.Logger
Verbose bool
} }
// NewRestoreOptions returns a new instance of RestoreOptions with defaults. // NewRestoreOptions returns a new instance of RestoreOptions with defaults.
@@ -1741,3 +1791,24 @@ func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
return nil, fmt.Errorf("invalid wal header magic: %x", magic) return nil, fmt.Errorf("invalid wal header magic: %x", magic)
} }
} }
func copyFile(dst, src string) error {
r, err := os.Open(src)
if err != nil {
return err
}
defer r.Close()
w, err := os.Create(dst)
if err != nil {
return err
}
defer w.Close()
if _, err := io.Copy(w, r); err != nil {
return err
} else if err := w.Sync(); err != nil {
return err
}
return nil
}

15
etc/gon.hcl Normal file
View File

@@ -0,0 +1,15 @@
source = ["./dist/litestream"]
bundle_id = "com.middlemost.litestream"
apple_id {
username = "benbjohnson@yahoo.com"
password = "@env:AC_PASSWORD"
}
sign {
application_identity = "Developer ID Application: Middlemost Systems, LLC"
}
zip {
output_path = "dist/litestream.zip"
}

View File

@@ -95,9 +95,9 @@ type Pos struct {
// String returns a string representation. // String returns a string representation.
func (p Pos) String() string { func (p Pos) String() string {
if p.IsZero() { if p.IsZero() {
return "<>" return ""
} }
return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset) return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset)
} }
// IsZero returns true if p is the zero value. // IsZero returns true if p is the zero value.
@@ -257,8 +257,8 @@ func isHexChar(ch rune) bool {
} }
// createFile creates the file and attempts to set the UID/GID. // createFile creates the file and attempts to set the UID/GID.
func createFile(filename string, uid, gid int) (*os.File, error) { func createFile(filename string, perm os.FileMode, uid, gid int) (*os.File, error) {
f, err := os.Create(filename) f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, perm)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -4,7 +4,6 @@ package litestream
import ( import (
"os" "os"
"syscall"
) )
// fileinfo returns syscall fields from a FileInfo object. // fileinfo returns syscall fields from a FileInfo object.
@@ -15,7 +14,7 @@ func fileinfo(fi os.FileInfo) (uid, gid int) {
// fixRootDirectory is copied from the standard library for use with mkdirAll() // fixRootDirectory is copied from the standard library for use with mkdirAll()
func fixRootDirectory(p string) string { func fixRootDirectory(p string) string {
if len(p) == len(`\\?\c:`) { if len(p) == len(`\\?\c:`) {
if IsPathSeparator(p[0]) && IsPathSeparator(p[1]) && p[2] == '?' && IsPathSeparator(p[3]) && p[5] == ':' { if os.IsPathSeparator(p[0]) && os.IsPathSeparator(p[1]) && p[2] == '?' && os.IsPathSeparator(p[3]) && p[5] == ':' {
return p + `\` return p + `\`
} }
} }

View File

@@ -2,6 +2,7 @@ package litestream
import ( import (
"context" "context"
"encoding/binary"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@@ -125,10 +126,14 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica {
MonitorEnabled: true, MonitorEnabled: true,
} }
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.path, r.Name()) var dbPath string
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.path, r.Name()) if db != nil {
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.path, r.Name()) dbPath = db.Path()
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.path, r.Name()) }
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
return r return r
} }
@@ -413,7 +418,7 @@ func (r *FileReplica) Stop() {
func (r *FileReplica) monitor(ctx context.Context) { func (r *FileReplica) monitor(ctx context.Context) {
// Clear old temporary files that my have been left from a crash. // Clear old temporary files that my have been left from a crash.
if err := removeTmpFiles(r.dst); err != nil { if err := removeTmpFiles(r.dst); err != nil {
log.Printf("%s(%s): cannot remove tmp files: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): monitor: cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
} }
// Continuously check for new data to replicate. // Continuously check for new data to replicate.
@@ -433,7 +438,7 @@ func (r *FileReplica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory. // Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil { if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
continue continue
} }
} }
@@ -450,7 +455,7 @@ func (r *FileReplica) retainer(ctx context.Context) {
return return
case <-ticker.C: case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil { if err := r.EnforceRetention(ctx); err != nil {
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
continue continue
} }
} }
@@ -544,11 +549,16 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
return nil return nil
} }
startTime := time.Now()
if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil { if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
return err return err
} else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil {
return err
} }
return compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid) log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
return nil
} }
// snapshotN returns the number of snapshots for a generation. // snapshotN returns the number of snapshots for a generation.
@@ -589,6 +599,8 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
} }
generation := dpos.Generation generation := dpos.Generation
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
// Create snapshot if no snapshots exist for generation. // Create snapshot if no snapshots exist for generation.
if n, err := r.snapshotN(generation); err != nil { if n, err := r.snapshotN(generation); err != nil {
return err return err
@@ -608,6 +620,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
return fmt.Errorf("cannot determine replica position: %s", err) return fmt.Errorf("cannot determine replica position: %s", err)
} }
Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos)
r.mu.Lock() r.mu.Lock()
r.pos = pos r.pos = pos
r.mu.Unlock() r.mu.Unlock()
@@ -660,11 +673,48 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
return err return err
} }
n, err := io.Copy(w, rd) // Copy header if at offset zero.
r.walBytesCounter.Add(float64(n)) var psalt uint64 // previous salt value
if pos := rd.Pos(); pos.Offset == 0 {
buf := make([]byte, WALHeaderSize)
if _, err := io.ReadFull(rd, buf); err != nil {
return err
}
psalt = binary.BigEndian.Uint64(buf[16:24])
n, err := w.Write(buf)
if err != nil { if err != nil {
return err return err
} }
r.walBytesCounter.Add(float64(n))
}
// Copy frames.
for {
pos := rd.Pos()
assert(pos.Offset == frameAlign(pos.Offset, r.db.pageSize), "shadow wal reader not frame aligned")
buf := make([]byte, WALFrameHeaderSize+r.db.pageSize)
if _, err := io.ReadFull(rd, buf); err == io.EOF {
break
} else if err != nil {
return err
}
// Verify salt matches the previous frame/header read.
salt := binary.BigEndian.Uint64(buf[8:16])
if psalt != 0 && psalt != salt {
return fmt.Errorf("replica salt mismatch: %s", filepath.Base(filename))
}
psalt = salt
n, err := w.Write(buf)
if err != nil {
return err
}
r.walBytesCounter.Add(float64(n))
}
if err := w.Sync(); err != nil { if err := w.Sync(); err != nil {
return err return err
@@ -792,7 +842,6 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
// If no retained snapshots exist, create a new snapshot. // If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 { if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err) return fmt.Errorf("cannot snapshot: %w", err)
} }
@@ -810,7 +859,7 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
// Delete generations if it has no snapshots being retained. // Delete generations if it has no snapshots being retained.
if snapshot == nil { if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation) log.Printf("%s(%s): retainer: deleting generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := os.RemoveAll(r.GenerationDir(generation)); err != nil { if err := os.RemoveAll(r.GenerationDir(generation)); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err) return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
} }
@@ -839,6 +888,7 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
return err return err
} }
var n int
for _, fi := range fis { for _, fi := range fis {
idx, _, err := ParseSnapshotPath(fi.Name()) idx, _, err := ParseSnapshotPath(fi.Name())
if err != nil { if err != nil {
@@ -847,10 +897,13 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
continue continue
} }
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err return err
} }
n++
}
if n > 0 {
log.Printf("%s(%s): retainer: deleting snapshots before %s/%08x; n=%d", r.db.Path(), r.Name(), generation, index, n)
} }
return nil return nil
@@ -867,6 +920,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
return err return err
} }
var n int
for _, fi := range fis { for _, fi := range fis {
idx, _, _, err := ParseWALPath(fi.Name()) idx, _, _, err := ParseWALPath(fi.Name())
if err != nil { if err != nil {
@@ -875,10 +929,13 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
continue continue
} }
log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err return err
} }
n++
}
if n > 0 {
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
} }
return nil return nil
@@ -923,6 +980,10 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int,
var index int var index int
for _, wal := range wals { for _, wal := range wals {
if wal.Generation != generation {
continue
}
if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) { if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) {
continue // after timestamp, skip continue // after timestamp, skip
} else if wal.Index > maxIndex { } else if wal.Index > maxIndex {
@@ -949,7 +1010,12 @@ func compressFile(src, dst string, uid, gid int) error {
} }
defer r.Close() defer r.Close()
w, err := createFile(dst+".tmp", uid, gid) fi, err := r.Stat()
if err != nil {
return err
}
w, err := createFile(dst+".tmp", fi.Mode(), uid, gid)
if err != nil { if err != nil {
return err return err
} }
@@ -978,20 +1044,6 @@ func compressFile(src, dst string, uid, gid int) error {
func ValidateReplica(ctx context.Context, r Replica) error { func ValidateReplica(ctx context.Context, r Replica) error {
db := r.DB() db := r.DB()
// Compute checksum of primary database under lock. This prevents a
// sync from occurring and the database will not be written.
chksum0, pos, err := db.CRC64()
if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err)
}
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos)
// Wait until replica catches up to position.
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
if err := waitForReplica(ctx, r, pos); err != nil {
return fmt.Errorf("cannot wait for replica: %w", err)
}
// Restore replica to a temporary directory. // Restore replica to a temporary directory.
tmpdir, err := ioutil.TempDir("", "*-litestream") tmpdir, err := ioutil.TempDir("", "*-litestream")
if err != nil { if err != nil {
@@ -999,8 +1051,21 @@ func ValidateReplica(ctx context.Context, r Replica) error {
} }
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
restorePath := filepath.Join(tmpdir, "db") // Compute checksum of primary database under lock. This prevents a
if err := db.Restore(ctx, RestoreOptions{ // sync from occurring and the database will not be written.
primaryPath := filepath.Join(tmpdir, "primary")
chksum0, pos, err := db.CRC64()
if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err)
}
// Wait until replica catches up to position.
if err := waitForReplica(ctx, r, pos); err != nil {
return fmt.Errorf("cannot wait for replica: %w", err)
}
restorePath := filepath.Join(tmpdir, "replica")
if err := RestoreReplica(ctx, r, RestoreOptions{
OutputPath: restorePath, OutputPath: restorePath,
ReplicaName: r.Name(), ReplicaName: r.Name(),
Generation: pos.Generation, Generation: pos.Generation,
@@ -1016,17 +1081,33 @@ func ValidateReplica(ctx context.Context, r Replica) error {
return err return err
} }
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1) status := "ok"
mismatch := chksum0 != chksum1
if mismatch {
status = "mismatch"
}
log.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
// Validate checksums match. // Validate checksums match.
if chksum0 != chksum1 { if mismatch {
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc() internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
// Compress mismatched databases and report temporary path for investigation.
if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil {
return fmt.Errorf("cannot compress primary db: %w", err)
} else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil {
return fmt.Errorf("cannot compress replica db: %w", err)
}
log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir)
return ErrChecksumMismatch return ErrChecksumMismatch
} }
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc() internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
log.Printf("%s(%s): replica ok", db.Path(), r.Name())
if err := os.RemoveAll(tmpdir); err != nil {
return fmt.Errorf("cannot remove temporary validation directory: %w", err)
}
return nil return nil
} }
@@ -1037,6 +1118,9 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
timer := time.NewTicker(10 * time.Second)
defer ticker.Stop()
once := make(chan struct{}, 1) once := make(chan struct{}, 1)
once <- struct{}{} once <- struct{}{}
@@ -1044,6 +1128,8 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-timer.C:
return fmt.Errorf("replica wait exceeded timeout")
case <-ticker.C: case <-ticker.C:
case <-once: // immediate on first check case <-once: // immediate on first check
} }
@@ -1051,7 +1137,7 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
// Obtain current position of replica, check if past target position. // Obtain current position of replica, check if past target position.
curr, err := r.CalcPos(ctx, pos.Generation) curr, err := r.CalcPos(ctx, pos.Generation)
if err != nil { if err != nil {
log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err) log.Printf("%s(%s): validator: cannot obtain replica position: %s", db.Path(), r.Name(), err)
continue continue
} }
@@ -1070,7 +1156,6 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
// If not ready, restart loop. // If not ready, restart loop.
if !ready { if !ready {
log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos)
continue continue
} }

View File

@@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -104,16 +105,20 @@ func NewReplica(db *litestream.DB, name string) *Replica {
MonitorEnabled: true, MonitorEnabled: true,
} }
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name()) var dbPath string
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name()) if db != nil {
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name()) dbPath = db.Path()
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name()) }
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST") r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE") r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "LIST")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "DELETE")
return r return r
} }
@@ -462,7 +467,7 @@ func (r *Replica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory. // Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil { if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
continue continue
} }
} }
@@ -598,6 +603,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
}() }()
snapshotPath := r.SnapshotPath(generation, index) snapshotPath := r.SnapshotPath(generation, index)
startTime := time.Now()
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
@@ -610,6 +616,8 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
r.putOperationTotalCounter.Inc() r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size())) r.putOperationBytesCounter.Add(float64(fi.Size()))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
return nil return nil
} }
@@ -647,10 +655,9 @@ func (r *Replica) Init(ctx context.Context) (err error) {
} }
// Create new AWS session. // Create new AWS session.
sess, err := session.NewSession(&aws.Config{ config := r.config()
Credentials: credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, ""), config.Region = aws.String(region)
Region: aws.String(region), sess, err := session.NewSession(config)
})
if err != nil { if err != nil {
return fmt.Errorf("cannot create aws session: %w", err) return fmt.Errorf("cannot create aws session: %w", err)
} }
@@ -659,12 +666,21 @@ func (r *Replica) Init(ctx context.Context) (err error) {
return nil return nil
} }
// config returns the AWS configuration. Uses the default credential chain
// unless a key/secret are explicitly set.
func (r *Replica) config() *aws.Config {
config := defaults.Get().Config
if r.AccessKeyID != "" || r.SecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "")
}
return config
}
func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, error) { func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, error) {
// Connect to US standard region to fetch info. // Connect to US standard region to fetch info.
sess, err := session.NewSession(&aws.Config{ config := r.config()
Credentials: credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, ""), config.Region = aws.String("us-east-1")
Region: aws.String("us-east-1"), sess, err := session.NewSession(config)
})
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -922,7 +938,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
// If no retained snapshots exist, create a new snapshot. // If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 { if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err) return fmt.Errorf("cannot snapshot: %w", err)
} }
@@ -945,7 +960,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
// Delete generations if it has no snapshots being retained. // Delete generations if it has no snapshots being retained.
if snapshot == nil { if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil { if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err) return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
} }
@@ -988,16 +1002,13 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
} }
// Delete all files in batches. // Delete all files in batches.
var n int
for i := 0; i < len(objIDs); i += MaxKeys { for i := 0; i < len(objIDs); i += MaxKeys {
j := i + MaxKeys j := i + MaxKeys
if j > len(objIDs) { if j > len(objIDs) {
j = len(objIDs) j = len(objIDs)
} }
for _, objID := range objIDs[i:j] {
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
}
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Delete: &s3.Delete{ Delete: &s3.Delete{
@@ -1007,9 +1018,12 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
}); err != nil { }); err != nil {
return err return err
} }
n += len(objIDs[i:j])
r.deleteOperationTotalCounter.Inc() r.deleteOperationTotalCounter.Inc()
} }
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
return nil return nil
} }