Compare commits

..

86 Commits

Author SHA1 Message Date
Ben Johnson
a178ef4714 Merge pull request #27 from benbjohnson/generations-replica-url
Allow replica URLs for generations command
2021-01-27 07:50:29 -07:00
Ben Johnson
7ca2e193b9 Allow replica URLs for generations command 2021-01-27 07:48:56 -07:00
Ben Johnson
39a6fabb9f Fix restore logging. 2021-01-26 17:01:00 -07:00
Ben Johnson
0249b4e4f5 Merge pull request #25 from benbjohnson/replica-url 2021-01-26 16:40:17 -07:00
Ben Johnson
67eeb49101 Allow replica URL to be used for commands
This commit refactors the commands to allow a replica URL when
restoring a database. If the first CLI arg is a URL with a scheme,
the it is treated as a replica URL.
2021-01-26 16:33:16 -07:00
Ben Johnson
f7213ed35c Allow replication without config file.
This commit changes `litestream replicate` to accept a database
path and a replica URL instead of using the config file. This allows
people to quickly try out the tool instead of learning the config
file syntax.
2021-01-25 10:33:50 -07:00
Ben Johnson
a532a0198e README 2021-01-24 10:09:54 -07:00
Ben Johnson
16f79e5814 Merge pull request #24 from benbjohnson/document-retention-period
Document retention period configuration
2021-01-24 09:32:31 -07:00
Ben Johnson
39aefc2c02 Document retention period configuration 2021-01-24 09:28:57 -07:00
Ben Johnson
0b08669bca Merge pull request #23 from benbjohnson/disable-metrics-by-default
Disable prometheus metrics by default
2021-01-24 09:18:37 -07:00
Ben Johnson
8f5761ee13 Disable prometheus metrics by default
The HTTP server should only be enabled if a user explicitly sets a
port for it.
2021-01-24 09:16:23 -07:00
Ben Johnson
d2eb4fa5ba Remove PR action 2021-01-24 08:54:29 -07:00
Ben Johnson
ca489c5e73 Merge pull request #22 from benbjohnson/notorize
Add signed homebrew install
2021-01-24 08:50:01 -07:00
Ben Johnson
f0ae48af4c Add signed homebrew install 2021-01-24 08:47:16 -07:00
Ben Johnson
9eae39e2fa README 2021-01-21 15:01:30 -07:00
Ben Johnson
42ab293ffb README 2021-01-21 14:53:21 -07:00
Ben Johnson
c8b72bf16b Fix release action 2021-01-21 14:39:23 -07:00
Ben Johnson
9c4de6c520 Debian release action 2021-01-21 14:34:42 -07:00
Ben Johnson
94411923a7 Fix unit test 2021-01-21 13:52:35 -07:00
Ben Johnson
e92db9ef4b Enforce stricter validation on restart.
Previously, the sync would validate the last page written to ensure
that replication picked up from the last position. However, a large
WAL file followed by a series of shorter checkpointed WAL files means
that the last page could be the same even if multiple checkpoints
have occurred.

To fix this, the WAL header must match the shadow WAL header when
starting litestream since there are no guarantees about checkpoints.
2021-01-21 13:44:05 -07:00
Ben Johnson
031a526b9a Only copy committed WAL pages 2021-01-21 12:44:11 -07:00
Ben Johnson
2244be885d README 2021-01-20 17:05:47 -07:00
Ben Johnson
95bcaa5927 Fix file replica compression 2021-01-19 09:25:38 -07:00
Ben Johnson
1935ebd6f0 Fix S3 GET bytes metric 2021-01-19 06:46:13 -07:00
Ben Johnson
7fb98df240 cleanup 2021-01-18 15:58:49 -07:00
Ben Johnson
f31c22af62 Remove s3 bucket lookup log 2021-01-18 15:27:16 -07:00
Ben Johnson
139d836d7a Fix file/dir mode 2021-01-18 15:23:28 -07:00
Ben Johnson
14dad1fd5a Switch from gzip to lz4 2021-01-18 14:45:12 -07:00
Ben Johnson
35d755e7f2 Remove debugging code 2021-01-18 10:33:30 -07:00
Ben Johnson
358dcd4650 Copy shadow WAL immediately after init 2021-01-18 10:01:16 -07:00
Ben Johnson
2ce4052300 Remove write lock during db checksum 2021-01-18 07:05:27 -07:00
Ben Johnson
44af75fa98 Fix missing WAL reader error 2021-01-18 07:05:17 -07:00
Ben Johnson
3c4fd152c9 Add more checksum logging 2021-01-18 06:38:03 -07:00
Ben Johnson
d259d9b9e3 Fix checksum logging 2021-01-17 10:19:39 -07:00
Ben Johnson
90a1d959d4 Remove size from s3 filenames 2021-01-17 10:02:06 -07:00
Ben Johnson
04d75507e3 Fix checksum hex padding 2021-01-17 09:52:09 -07:00
Ben Johnson
4b65e6a88f Log validation position 2021-01-17 07:38:13 -07:00
Ben Johnson
07a65cbac7 Fix crc64 unit test 2021-01-16 10:04:03 -07:00
Ben Johnson
6ac6a8536d Obtain write lock during validation. 2021-01-16 09:27:43 -07:00
Ben Johnson
71ab15e50a Fix S3 GET stats 2021-01-16 09:22:33 -07:00
Ben Johnson
b4e5079760 Add .deb packaging 2021-01-16 09:22:02 -07:00
Ben Johnson
78563f821d Do not require databases when starting replication 2021-01-16 09:15:16 -07:00
Ben Johnson
e65536f81d Stop waiting for replica if generation changes 2021-01-16 07:47:02 -07:00
Ben Johnson
25fec29e1a Clear last position on replica sync error 2021-01-16 07:45:08 -07:00
Ben Johnson
cbc2dce6dc Add busy timeout 2021-01-16 07:33:32 -07:00
Ben Johnson
1b8cfc8a41 Add validation interval 2021-01-15 16:37:04 -07:00
Ben Johnson
290e06e60d Reduce s3 LIST operations 2021-01-15 13:31:04 -07:00
Ben Johnson
b94ee366e5 Fix snapshot only restore 2021-01-15 13:12:15 -07:00
Ben Johnson
743aeb83e1 Revert gzip compression level, fix s3 wal upload 2021-01-15 13:04:21 -07:00
Ben Johnson
a7ec05ad7a Allow global AWS settings in config. 2021-01-15 12:27:41 -07:00
Ben Johnson
28dd7b564e Lookup s3 bucket region if not specified 2021-01-15 12:18:07 -07:00
Ben Johnson
43dda4315f Allow URLs for replica config path 2021-01-15 12:04:23 -07:00
Ben Johnson
0655bf420a Fix unit tests 2021-01-14 16:13:19 -07:00
Ben Johnson
8c113cf260 Add file & s3 replica metrics 2021-01-14 16:10:02 -07:00
Ben Johnson
daa74f87b4 Add file replica metrics 2021-01-14 15:47:58 -07:00
Ben Johnson
e1c9e09161 Update wal segment naming 2021-01-14 15:26:29 -07:00
Ben Johnson
1e4e9633cc Add s3 sync interval 2021-01-14 15:04:26 -07:00
Ben Johnson
294846cce2 Add context to s3 2021-01-13 16:38:00 -07:00
Ben Johnson
9eb7bd41c2 S3 reader & retention enforcement 2021-01-13 16:21:42 -07:00
Ben Johnson
1ac4adb272 Basic s3 replication working 2021-01-13 14:23:41 -07:00
Ben Johnson
a42f83f3cb Add LITESTREAM_CONFIG env var 2021-01-13 13:17:38 -07:00
Ben Johnson
57a02a8628 S3 replica 2021-01-13 10:14:54 -07:00
Ben Johnson
faa5765745 Add retention policy, remove WAL subdir 2021-01-12 15:22:37 -07:00
Ben Johnson
1fa1313b0b Add trace logging. 2021-01-11 11:04:29 -07:00
Ben Johnson
bcdb553267 Use database owner/group 2021-01-11 09:39:08 -07:00
Ben Johnson
9828b4c1dd Rename 'databases' to 'dbs' in config 2021-01-10 10:07:07 -07:00
Ben Johnson
dde9d1042d Update generation lag calc 2021-01-10 09:54:05 -07:00
Ben Johnson
8f30ff7d93 Fix negative duration truncation. 2021-01-10 09:52:04 -07:00
Ben Johnson
aa136a17ee Fix duration truncation. 2021-01-10 09:46:39 -07:00
Ben Johnson
60cb2c97ca Set default max checkpoint. 2021-01-10 09:46:27 -07:00
Ben Johnson
0abe09526d Fix local build 2021-01-09 08:59:08 -07:00
Ben Johnson
b0a3440356 make dist 2021-01-08 16:05:07 -07:00
Ben Johnson
a8d63b54aa README 2021-01-08 10:07:26 -07:00
Ben Johnson
b22f3f100d Add FileReplica.Sync() unit tests. 2021-01-05 15:48:50 -07:00
Ben Johnson
3075b2e92b Fix WAL mod time test 2021-01-05 15:15:28 -07:00
Ben Johnson
7c3272c96f Revert "Test 1.16beta1"
This reverts commit 4294fcf4b4.
2021-01-05 15:11:29 -07:00
Ben Johnson
4294fcf4b4 Test 1.16beta1 2021-01-05 15:10:09 -07:00
Ben Johnson
ae0f51eaa9 Fix setup-go 2021-01-05 14:34:14 -07:00
Ben Johnson
8871d75a8e Fix max checkpoint size check 2021-01-05 14:17:13 -07:00
Ben Johnson
c22eea13ad Add checkpoint tests 2021-01-05 14:07:17 -07:00
Ben Johnson
f4d055916a Add DB sync tests 2021-01-05 13:59:16 -07:00
Ben Johnson
979cabcdb9 Add some DB.Sync() tests 2021-01-01 10:02:03 -07:00
Ben Johnson
5134bc3328 Add test coverage for DB.CRC64 2021-01-01 09:26:23 -07:00
Ben Johnson
78d9de6512 Add DB path tests 2021-01-01 09:00:23 -07:00
Ben Johnson
065f641526 Change validation to use CRC-64 2021-01-01 08:24:11 -07:00
Ben Johnson
f4d0d87fa7 Add DB.UpdatedAt() tests 2021-01-01 08:20:40 -07:00
32 changed files with 3778 additions and 1065 deletions

View File

@@ -5,7 +5,7 @@ on:
name: release
jobs:
release:
linux:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
@@ -16,10 +16,21 @@ jobs:
env:
GITHUB_TOKEN: ${{ github.token }}
- name: Install nfpm
run: |
wget https://github.com/goreleaser/nfpm/releases/download/v2.2.3/nfpm_2.2.3_Linux_x86_64.tar.gz
tar zxvf nfpm_2.2.3_Linux_x86_64.tar.gz
- name: Build litestream
run: |
go build -ldflags "-X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o litestream ./cmd/litestream
mkdir -p dist
cp etc/litestream.yml etc/litestream.service dist
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml
go build -ldflags "-X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o dist/litestream ./cmd/litestream
cd dist
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz litestream
../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
- name: Upload release binary
uses: actions/upload-release-asset@v1.0.2
@@ -27,6 +38,16 @@ jobs:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_name: litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_content_type: application/gzip
- name: Upload debian package
uses: actions/upload-release-asset@v1.0.2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
asset_name: litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
asset_content_type: application/octet-stream

View File

@@ -1,10 +1,12 @@
on: [push, pull_request]
on: push
name: test
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
- uses: actions/checkout@v2

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.DS_Store
/dist

22
Makefile Normal file
View File

@@ -0,0 +1,22 @@
default:
dist-linux:
mkdir -p dist
cp etc/litestream.yml dist/litestream.yml
docker run --rm -v "${PWD}":/usr/src/litestream -w /usr/src/litestream -e GOOS=linux -e GOARCH=amd64 golang:1.15 go build -v -o dist/litestream ./cmd/litestream
tar -cz -f dist/litestream-linux-amd64.tar.gz -C dist litestream
dist-macos:
ifndef LITESTREAM_VERSION
$(error LITESTREAM_VERSION is undefined)
endif
mkdir -p dist
go build -v -ldflags "-X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
gon etc/gon.hcl
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
clean:
rm -rf dist
.PHONY: default dist-linux dist-macos clean

297
README.md
View File

@@ -1,17 +1,296 @@
litestream
Litestream
![GitHub release (latest by date)](https://img.shields.io/github/v/release/benbjohnson/litestream)
![Status](https://img.shields.io/badge/status-beta-blue)
![GitHub](https://img.shields.io/github/license/benbjohnson/litestream)
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
==========
Streaming replication for SQLite.
Litestream is a standalone streaming replication tool for SQLite. It runs as a
background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database.
If you need support or have ideas for improving Litestream, please visit the
[GitHub Discussions](https://github.com/benbjohnson/litestream/discussions) to
chat.
If you find this project interesting, please consider starring the project on
GitHub.
## Questions
## Installation
- How to avoid WAL checkpointing on close?
### Mac OS (Homebrew)
To install from homebrew, first add the Litestream tap and then install:
## Notes
```sql
-- Disable autocheckpointing.
PRAGMA wal_autocheckpoint = 0
```sh
$ brew install benbjohnson/litestream/litestream
```
### Linux (Debian)
You can download the `.deb` file from the [Releases page][releases] page and
then run the following:
```sh
$ sudo dpkg -i litestream-v0.3.0-linux-amd64.deb
```
Once installed, you'll need to enable & start the service:
```sh
$ sudo systemctl enable litestream
$ sudo systemctl start litestream
```
### Release binaries
You can also download the release binary for your system from the
[releases page][releases] and run it as a standalone application.
### Building from source
Download and install the [Go toolchain](https://golang.org/) and then run:
```sh
$ go install ./cmd/litestream
```
The `litestream` binary should be in your `$GOPATH/bin` folder.
## Quick Start
Litestream provides a configuration file that can be used for production
deployments but you can also specify a single database and replica on the
command line for testing.
First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this
example. You'll also need to set your AWS credentials:
```sh
$ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
```
Next you can run the `litestream replicate` command with the path to the
database you want to backup and the URL of your replica destination:
```sh
$ litestream replicate /path/to/db s3://mybkt/db
```
If you make changes to your local database, those changes will be replicated
to S3 every 10 seconds. From another terminal window, you can restore your
database from your S3 replica:
```
$ litestream restore -v -o /path/to/restored/db s3://mybkt/db
```
Voila! 🎉
Your database should be restored to the last replicated state that
was sent to S3. You can adjust your replication frequency and other options by
using a configuration-based approach specified below.
## Configuration
A configuration-based install gives you more replication options. By default,
the config file lives at `/etc/litestream.yml` but you can pass in a different
path to any `litestream` command using the `-config PATH` flag. You can also
set the `LITESTREAM_CONFIG` environment variable to specify a new path.
The configuration specifies one or more `dbs` and a list of one or more replica
locations for each db. Below are some common configurations:
### Replicate to S3
This will replicate the database at `/path/to/db` to the `"/db"` path inside
the S3 bucket named `"mybkt"`.
```yaml
access-key-id: AKIAxxxxxxxxxxxxxxxx
secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
dbs:
- path: /path/to/db
replicas:
- url: s3://mybkt/db
```
### Replicate to another file path
This will replicate the database at `/path/to/db` to a directory named
`/path/to/replica`.
```yaml
dbs:
- path: /path/to/db
replicas:
- path: /path/to/replica
```
### Retention period
By default, replicas will retain a snapshot & subsequent WAL changes for 24
hours. When the snapshot age exceeds the retention threshold, a new snapshot
is taken and uploaded and the previous snapshot and WAL files are removed.
You can configure this setting per-replica. Times are parsed using [Go's
duration](https://golang.org/pkg/time/#ParseDuration) so time units of hours
(`h`), minutes (`m`), and seconds (`s`) are allowed but days, weeks, months, and
years are not.
```yaml
db:
- path: /path/to/db
replicas:
- url: s3://mybkt/db
retention: 1h # 1 hour retention
```
### Monitoring replication
You can also enable a Prometheus metrics endpoint to monitor replication by
specifying a bind address with the `addr` field:
```yml
addr: ":9090"
```
This will make metrics available at: http://localhost:9090/metrics
### Other configuration options
These are some additional configuration options available on replicas:
- `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`.
- `name`—Specify an optional name for the replica if you are using multiple replicas.
- `path`—File path to the replica location.
- `url`—URL to the replica location.
- `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`.
- `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default.
These replica options are only available for S3 replicas:
- `bucket`—S3 bucket name. Derived from `"path"`.
- `region`—S3 bucket region. Looked up on startup if unspecified.
- `sync-interval`—Replication sync frequency.
## Usage
### Replication
Once your configuration is saved, you'll need to begin replication. If you
installed the `.deb` file then run:
```sh
$ sudo systemctl restart litestream
```
To run litestream on its own, run:
```sh
# Replicate using the /etc/litestream.yml configuration.
$ litestream replicate
# Replicate using a different configuration path.
$ litestream replicate -config /path/to/litestream.yml
```
The `litestream` command will initialize and then wait indefinitely for changes.
You should see your destination replica path is now populated with a
`generations` directory. Inside there should be a 16-character hex generation
directory and inside there should be snapshots & WAL files. As you make changes
to your source database, changes will be copied over to your replica incrementally.
### Restoring a backup
Litestream can restore a previous snapshot and replay all replicated WAL files.
By default, it will restore up to the latest WAL file but you can also perform
point-in-time restores.
A database can only be restored to a path that does not exist so you don't need
to worry about accidentally overwriting your current database.
```sh
# Restore database to original path.
$ litestream restore /path/to/db
# Restore database to a new location.
$ litestream restore -o /path/to/restored/db /path/to/db
# Restore from a replica URL.
$ litestream restore -o /path/to/restored/db s3://mybkt/db
# Restore database to a specific point-in-time.
$ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db
```
Point-in-time restores only have the resolution of the timestamp of the WAL file
itself. By default, litestream will start a new WAL file every minute so
point-in-time restores are only accurate to the minute.
## How it works
SQLite provides a WAL (write-ahead log) journaling mode which writes pages to
a `-wal` file before eventually being copied over to the original database file.
This copying process is known as checkpointing. The WAL file works as a circular
buffer so when the WAL reaches a certain size then it restarts from the beginning.
Litestream works by taking over the checkpointing process and controlling when
it is restarted to ensure that it copies every new page. Checkpointing is only
allowed when there are no read transactions so Litestream maintains a
long-running read transaction against each database until it is ready to
checkpoint.
The SQLite WAL file is copied to a separate location called the shadow WAL which
ensures that it will not be overwritten by SQLite. This shadow WAL acts as a
temporary buffer so that replicas can replicate to their destination (e.g.
another file path or to S3). The shadow WAL files are removed once they have
been fully replicated. You can find the shadow directory as a hidden directory
next to your database file. If you database file is named `/var/lib/my.db` then
the shadow directory will be `/var/lib/.my.db-litestream`.
Litestream groups a snapshot and all subsequent WAL changes into "generations".
A generation is started on initial replication of a database and a new
generation will be started if litestream detects that the WAL replication is
no longer contiguous. This can occur if the `litestream` process is stopped and
another process is allowed to checkpoint the WAL.
## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), litestream is open
source but closed to contributions. This keeps the code base free of proprietary
or licensed code but it also helps me continue to maintain and build litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and
I eventually archived the project. Writing databases & low-level replication
tools involves nuance and simple one line changes can have profound and
unexpected changes in correctness and performance. Small contributions
typically required hours of my time to properly test and validate them.
I am grateful for community involvement, bug reports, & feature requests. I do
not wish to come off as anything but welcoming, however, I've
made the decision to keep this project closed to contributions for my own
mental health and long term viability of the project.
[releases]: https://github.com/benbjohnson/litestream/releases

View File

@@ -10,8 +10,10 @@ import (
"text/tabwriter"
)
// DatabasesCommand is a command for listing managed databases.
type DatabasesCommand struct{}
// Run executes the command.
func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-databases", flag.ContinueOnError)
@@ -36,7 +38,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "path\treplicas")
for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(dbConfig)
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
@@ -56,6 +58,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *DatabasesCommand) Usage() {
fmt.Printf(`
The databases command lists all databases in the configuration file.
@@ -71,6 +74,6 @@ Arguments:
Defaults to %s
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -7,13 +7,16 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// GenerationsCommand represents a command to list all generations for a database.
type GenerationsCommand struct{}
// Run executes the command.
func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError)
@@ -23,50 +26,60 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
var db *litestream.DB
var r litestream.Replica
updatedAt := time.Now()
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Instantiate DB from from configuration.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Determine last time database or WAL was updated.
updatedAt, err := db.UpdatedAt()
if err != nil {
if updatedAt, err = db.UpdatedAt(); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
var replicas []litestream.Replica
if r != nil {
replicas = []litestream.Replica{r}
} else {
replicas = db.Replicas
}
// List each generation.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range db.Replicas {
if *replicaName != "" && r.Name() != *replicaName {
continue
}
for _, r := range replicas {
generations, err := r.Generations(ctx)
if err != nil {
log.Printf("%s: cannot list generations: %s", r.Name(), err)
@@ -84,10 +97,11 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
r.Name(),
generation,
truncateDuration(stats.UpdatedAt.Sub(updatedAt)).String(),
truncateDuration(updatedAt.Sub(stats.UpdatedAt)).String(),
stats.CreatedAt.Format(time.RFC3339),
stats.UpdatedAt.Format(time.RFC3339),
)
w.Flush()
}
}
w.Flush()
@@ -95,35 +109,51 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
return nil
}
// Usage prints the help message to STDOUT.
func (c *GenerationsCommand) Usage() {
fmt.Printf(`
The generations command lists all generations for a database. It also lists
stats about their lag behind the primary database and the time range they cover.
The generations command lists all generations for a database or replica. It also
lists stats about their lag behind the primary database and the time range they
cover.
Usage:
litestream generations [arguments] DB
litestream generations [arguments] DB_PATH
litestream generations [arguments] REPLICA_URL
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
-replica NAME
Optional, filters by replica.
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}
func truncateDuration(d time.Duration) time.Duration {
if d > time.Hour {
return d.Truncate(time.Hour)
} else if d > time.Minute {
return d.Truncate(time.Minute)
} else if d > time.Second {
if d < 0 {
if d < -10*time.Second {
return d.Truncate(time.Second)
} else if d < -time.Second {
return d.Truncate(time.Second / 10)
} else if d < -time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d < -time.Microsecond {
return d.Truncate(time.Microsecond)
}
return d
}
if d > 10*time.Second {
return d.Truncate(time.Second)
} else if d > time.Second {
return d.Truncate(time.Second / 10)
} else if d > time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d > time.Microsecond {

View File

@@ -6,12 +6,17 @@ import (
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"os/user"
"path"
"path/filepath"
"strings"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
_ "github.com/mattn/go-sqlite3"
"gopkg.in/yaml.v2"
)
@@ -20,9 +25,6 @@ var (
Version = "(development build)"
)
// DefaultConfigPath is the default configuration path.
const DefaultConfigPath = "/etc/litestream.yml"
func main() {
log.SetFlags(0)
@@ -35,12 +37,15 @@ func main() {
}
}
// Main represents the main program execution.
type Main struct{}
// NewMain returns a new instance of Main.
func NewMain() *Main {
return &Main{}
}
// Run executes the program.
func (m *Main) Run(ctx context.Context, args []string) (err error) {
var cmd string
if len(args) > 0 {
@@ -58,8 +63,6 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
return (&RestoreCommand{}).Run(ctx, args)
case "snapshots":
return (&SnapshotsCommand{}).Run(ctx, args)
case "validate":
return (&ValidateCommand{}).Run(ctx, args)
case "version":
return (&VersionCommand{}).Run(ctx, args)
case "wal":
@@ -73,6 +76,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
}
}
// Usage prints the help screen to STDOUT.
func (m *Main) Usage() {
fmt.Println(`
litestream is a tool for replicating SQLite databases.
@@ -83,37 +87,37 @@ Usage:
The commands are:
databases list databases specified in config file
generations list available generations for a database
replicate runs a server to replicate databases
restore recovers database backup from a replica
snapshots list available snapshots for a database
validate checks replica to ensure a consistent state with primary
version prints the version
version prints the binary version
wal list available WAL files for a database
`[1:])
}
// Default configuration settings.
const (
DefaultAddr = ":9090"
)
// Config represents a configuration file for the litestream daemon.
type Config struct {
// Bind address for serving metrics.
Addr string `yaml:"addr"`
// List of databases to manage.
DBs []*DBConfig `yaml:"databases"`
DBs []*DBConfig `yaml:"dbs"`
// Global S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
}
// DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config {
return Config{
Addr: DefaultAddr,
}
return Config{}
}
// DBConfig returns database configuration by path.
func (c *Config) DBConfig(path string) *DBConfig {
for _, dbConfig := range c.DBs {
if dbConfig.Path == path {
@@ -124,18 +128,13 @@ func (c *Config) DBConfig(path string) *DBConfig {
}
// ReadConfigFile unmarshals config from filename. Expands path if needed.
func ReadConfigFile(filename string) (Config, error) {
func ReadConfigFile(filename string) (_ Config, err error) {
config := DefaultConfig()
// Expand filename, if necessary.
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) {
u, err := user.Current()
filename, err = expand(filename)
if err != nil {
return config, err
} else if u.HomeDir == "" {
return config, fmt.Errorf("home directory unset")
}
filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix))
}
// Read & deserialize configuration.
@@ -146,32 +145,123 @@ func ReadConfigFile(filename string) (Config, error) {
} else if err := yaml.Unmarshal(buf, &config); err != nil {
return config, err
}
// Normalize paths.
for _, dbConfig := range config.DBs {
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
return config, err
}
}
return config, nil
}
// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
Replicas []*ReplicaConfig `yaml:"replicas"`
}
// ReplicaConfig represents the configuration for a single replica in a database.
type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"` // used for file replicas
Path string `yaml:"path"`
URL string `yaml:"url"`
Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
ValidationInterval time.Duration `yaml:"validation-interval"`
// S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
}
// NewReplicaFromURL returns a new Replica instance configured from a URL.
// The replica's database is not set.
func NewReplicaFromURL(s string) (litestream.Replica, error) {
scheme, host, path, err := ParseReplicaURL(s)
if err != nil {
return nil, err
}
switch scheme {
case "file":
return litestream.NewFileReplica(nil, "", path), nil
case "s3":
r := s3.NewReplica(nil, "")
r.Bucket, r.Path = host, path
return r, nil
default:
return nil, fmt.Errorf("invalid replica url type: %s", s)
}
}
// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
}
switch u.Scheme {
case "file":
scheme, u.Scheme = u.Scheme, ""
return scheme, "", path.Clean(u.String()), nil
case "":
return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
default:
return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
}
}
// isURL returns true if s can be parsed and has a scheme.
func isURL(s string) bool {
u, err := url.Parse(s)
return err == nil && u.Scheme != ""
}
// ReplicaType returns the type based on the type field or extracted from the URL.
func (c *ReplicaConfig) ReplicaType() string {
typ, _, _, _ := ParseReplicaURL(c.URL)
if typ != "" {
return typ
} else if c.Type != "" {
return c.Type
}
return "file"
}
// DefaultConfigPath returns the default config path.
func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
return v
}
return "/etc/litestream.yml"
}
func registerConfigFlag(fs *flag.FlagSet, p *string) {
fs.StringVar(p, "config", DefaultConfigPath, "config path")
fs.StringVar(p, "config", DefaultConfigPath(), "config path")
}
// newDBFromConfig instantiates a DB based on a configuration.
func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
path, err := expand(dbc.Path)
if err != nil {
return nil, err
}
// Initialize database with given path.
db := litestream.NewDB(config.Path)
db := litestream.NewDB(path)
// Instantiate and attach replicas.
for _, rconfig := range config.Replicas {
r, err := newReplicaFromConfig(db, rconfig)
for _, rc := range dbc.Replicas {
r, err := newReplicaFromConfig(db, c, dbc, rc)
if err != nil {
return nil, err
}
@@ -182,19 +272,129 @@ func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
}
// newReplicaFromConfig instantiates a replica for a DB based on a config.
func newReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (litestream.Replica, error) {
switch config.Type {
case "", "file":
return newFileReplicaFromConfig(db, config)
func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) {
// Ensure user did not specify URL in path.
if isURL(rc.Path) {
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path)
}
switch rc.ReplicaType() {
case "file":
return newFileReplicaFromConfig(db, c, dbc, rc)
case "s3":
return newS3ReplicaFromConfig(db, c, dbc, rc)
default:
return nil, fmt.Errorf("unknown replica type in config: %q", config.Type)
return nil, fmt.Errorf("unknown replica type in config: %q", rc.Type)
}
}
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplica, error) {
if config.Path == "" {
return nil, fmt.Errorf("file replica path require for db %q", db.Path())
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) {
path := rc.Path
if rc.URL != "" {
_, _, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
return litestream.NewFileReplica(db, config.Name, config.Path), nil
}
if path == "" {
return nil, fmt.Errorf("%s: file replica path required", db.Path())
}
if path, err = expand(path); err != nil {
return nil, err
}
r := litestream.NewFileReplica(db, rc.Name, path)
if v := rc.Retention; v > 0 {
r.Retention = v
}
if v := rc.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v
}
if v := rc.ValidationInterval; v > 0 {
r.ValidationInterval = v
}
return r, nil
}
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) {
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
path := rc.Path
if rc.URL != "" {
_, bucket, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
// Use global or replica-specific S3 settings.
accessKeyID := c.AccessKeyID
if v := rc.AccessKeyID; v != "" {
accessKeyID = v
}
secretAccessKey := c.SecretAccessKey
if v := rc.SecretAccessKey; v != "" {
secretAccessKey = v
}
region := c.Region
if v := rc.Region; v != "" {
region = v
}
// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
}
// Build replica.
r := s3.NewReplica(db, rc.Name)
r.AccessKeyID = accessKeyID
r.SecretAccessKey = secretAccessKey
r.Region = region
r.Bucket = bucket
r.Path = path
if v := rc.Retention; v > 0 {
r.Retention = v
}
if v := rc.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v
}
if v := rc.SyncInterval; v > 0 {
r.SyncInterval = v
}
if v := rc.ValidationInterval; v > 0 {
r.ValidationInterval = v
}
return r, nil
}
// expand returns an absolute path for s.
func expand(s string) (string, error) {
// Just expand to absolute path if there is no home directory prefix.
prefix := "~" + string(os.PathSeparator)
if s != "~" && !strings.HasPrefix(s, prefix) {
return filepath.Abs(s)
}
// Look up home directory.
u, err := user.Current()
if err != nil {
return "", err
} else if u.HomeDir == "" {
return "", fmt.Errorf("cannot expand path %s, no home directory available", s)
}
// Return path with tilde replaced by the home directory.
if s == "~" {
return u.HomeDir, nil
}
return filepath.Join(u.HomeDir, strings.TrimPrefix(s, prefix)), nil
}

View File

@@ -5,15 +5,19 @@ import (
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// ReplicateCommand represents a command that continuously replicates SQLite databases.
type ReplicateCommand struct {
ConfigPath string
Config Config
@@ -25,20 +29,36 @@ type ReplicateCommand struct {
// Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
verbose := fs.Bool("v", false, "verbose logging")
registerConfigFlag(fs, &c.ConfigPath)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
}
// Load configuration.
if c.ConfigPath == "" {
return errors.New("-config required")
// Load configuration or use CLI args to build db/replica.
var config Config
if fs.NArg() == 1 {
return fmt.Errorf("must specify at least one replica URL for %s", fs.Arg(0))
} else if fs.NArg() > 1 {
dbConfig := &DBConfig{Path: fs.Arg(0)}
for _, u := range fs.Args()[1:] {
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{URL: u})
}
config, err := ReadConfigFile(c.ConfigPath)
config.DBs = []*DBConfig{dbConfig}
} else if c.ConfigPath != "" {
config, err = ReadConfigFile(c.ConfigPath)
if err != nil {
return err
}
} else {
return errors.New("-config flag or database/replica arguments required")
}
// Enable trace logging.
if *verbose {
litestream.Tracef = log.Printf
}
// Setup signal handler.
ctx, cancel := context.WithCancel(ctx)
@@ -50,11 +70,11 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
fmt.Printf("litestream %s\n", Version)
if len(config.DBs) == 0 {
return errors.New("configuration must specify at least one database")
fmt.Println("no databases specified in configuration")
}
for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(dbConfig)
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
@@ -67,15 +87,29 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
}
// Notify user that initialization is done.
fmt.Printf("Initialized with %d databases.\n", len(c.DBs))
for _, db := range c.DBs {
fmt.Printf("initialized db: %s\n", db.Path())
for _, r := range db.Replicas {
switch r := r.(type) {
case *litestream.FileReplica:
fmt.Printf("replicating to: name=%q type=%q path=%q\n", r.Name(), r.Type(), r.Path())
case *s3.Replica:
fmt.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q\n", r.Name(), r.Type(), r.Bucket, r.Path, r.Region)
default:
fmt.Printf("replicating to: name=%q type=%q\n", r.Name(), r.Type())
}
}
}
// Serve metrics over HTTP if enabled.
if config.Addr != "" {
_, port, _ := net.SplitHostPort(config.Addr)
fmt.Printf("Serving metrics on http://localhost:%s/metrics\n", port)
fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port)
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(config.Addr, nil)
if err := http.ListenAndServe(config.Addr, nil); err != nil {
log.Printf("cannot start metrics server: %s", err)
}
}()
}
@@ -105,19 +139,28 @@ func (c *ReplicateCommand) Close() (err error) {
return err
}
// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
The replicate command starts a server to monitor & replicate databases
specified in your configuration file.
The replicate command starts a server to monitor & replicate databases.
You can specify your database & replicas in a configuration file or you can
replicate a single database file by specifying its path and its replicas in the
command line arguments.
Usage:
litestream replicate [arguments]
litestream replicate [arguments] DB_PATH REPLICA_URL [REPLICA_URL...]
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
`[1:], DefaultConfigPath)
-v
Enable verbose logging output.
`[1:], DefaultConfigPath())
}

View File

@@ -7,15 +7,15 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/benbjohnson/litestream"
)
type RestoreCommand struct {
}
// RestoreCommand represents a command to restore a database from a backup.
type RestoreCommand struct{}
// Run executes the command.
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
opt := litestream.NewRestoreOptions()
@@ -32,20 +32,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Parse timestamp, if specified.
if *timestampStr != "" {
if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
@@ -63,32 +54,84 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Determine replica & generation to restore from.
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = c.loadFromURL(ctx, fs.Arg(0), &opt); err != nil {
return err
}
} else if configPath != "" {
if r, err = c.loadFromConfig(ctx, fs.Arg(0), configPath, &opt); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Instantiate DB.
// Return an error if no matching targets found.
if opt.Generation == "" {
return fmt.Errorf("no matching backups found")
}
return litestream.RestoreReplica(ctx, r, opt)
}
// loadFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
r, err := NewReplicaFromURL(replicaURL)
if err != nil {
return nil, err
}
opt.Generation, _, err = litestream.CalcReplicaRestoreTarget(ctx, r, *opt)
return r, err
}
// loadFromConfig returns a replica & updates the restore options from a DB reference.
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return nil, err
}
// Lookup database from configuration file by path.
if dbPath, err = expand(dbPath); err != nil {
return nil, err
}
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
return nil, fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
return nil, err
}
return db.Restore(ctx, opt)
// Restore into original database path if not specified.
if opt.OutputPath == "" {
opt.OutputPath = dbPath
}
// Determine the appropriate replica & generation to restore from,
r, generation, err := db.CalcRestoreTarget(ctx, *opt)
if err != nil {
return nil, err
}
opt.Generation = generation
return r, nil
}
// Usage prints the help screen to STDOUT.
func (c *RestoreCommand) Usage() {
fmt.Printf(`
The restore command recovers a database from a previous snapshot and WAL.
Usage:
litestream restore [arguments] DB
litestream restore [arguments] DB_PATH
litestream restore [arguments] REPLICA_URL
Arguments:
@@ -142,6 +185,6 @@ Examples:
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -6,15 +6,16 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// SnapshotsCommand represents a command to list snapshots for a command.
type SnapshotsCommand struct{}
// Run executes the command.
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError)
@@ -29,37 +30,42 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Find snapshots by db or replica.
var infos []*litestream.SnapshotInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.Snapshots(ctx); err != nil {
if r != nil {
if infos, err = r.Snapshots(ctx); err != nil {
return err
}
} else {
@@ -85,13 +91,16 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *SnapshotsCommand) Usage() {
fmt.Printf(`
The snapshots command lists all snapshots available for a database.
The snapshots command lists all snapshots available for a database or replica.
Usage:
litestream snapshots [arguments] DB
litestream snapshots [arguments] DB_PATH
litestream snapshots [arguments] REPLICA_URL
Arguments:
@@ -102,7 +111,6 @@ Arguments:
-replica NAME
Optional, filter by a specific replica.
Examples:
# List all snapshots for a database.
@@ -111,7 +119,10 @@ Examples:
# List all snapshots on S3.
$ litestream snapshots -replica s3 /path/to/db
# List all snapshots by replica URL.
$ litestream snapshots s3://mybkt/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -1,136 +0,0 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"github.com/benbjohnson/litestream"
)
type ValidateCommand struct{}
func (c *ValidateCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
opt := litestream.NewRestoreOptions()
fs := flag.NewFlagSet("litestream-validate", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run")
verbose := fs.Bool("v", false, "verbose output")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Verbose output is automatically enabled if dry run is specified.
if opt.DryRun {
*verbose = true
}
// Instantiate logger if verbose output is enabled.
if *verbose {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Ensure replica exists, if specified.
if opt.ReplicaName != "" && db.Replica(opt.ReplicaName) == nil {
return fmt.Errorf("replica not found: %s", opt.ReplicaName)
}
// Validate all matching replicas.
var hasInvalidReplica bool
for _, r := range db.Replicas {
if opt.ReplicaName != "" && opt.ReplicaName != r.Name() {
continue
}
if err := db.Validate(ctx, r.Name(), opt); err != nil {
fmt.Printf("%s: replica invalid: %s\n", r.Name(), err)
}
}
if hasInvalidReplica {
return fmt.Errorf("one or more invalid replicas found")
}
fmt.Println("ok")
return nil
}
func (c *ValidateCommand) Usage() {
fmt.Printf(`
The validate command compares a checksum of the primary database with a
checksum of the replica at the same point in time. Returns an error if the
databases are not equal.
The restored database must be written to a temporary file so you must ensure
you have enough disk space before performing this operation.
Usage:
litestream validate [arguments] DB
Arguments:
-config PATH
Specifies the configuration file.
Defaults to %s
-replica NAME
Validate a specific replica.
Defaults to validating all replicas.
-dry-run
Prints all log output as if it were running but does
not perform actual validation.
-v
Verbose output.
Examples:
# Validate all replicas for the given database.
$ litestream validate /path/to/db
# Validate only the S3 replica.
$ litestream restore -replica s3 /path/to/db
`[1:],
DefaultConfigPath,
)
}

View File

@@ -6,8 +6,10 @@ import (
"fmt"
)
// VersionCommand represents a command to print the current version.
type VersionCommand struct{}
// Run executes the command.
func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-version", flag.ContinueOnError)
fs.Usage = c.Usage
@@ -20,6 +22,7 @@ func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *VersionCommand) Usage() {
fmt.Println(`
Prints the version.

View File

@@ -6,15 +6,16 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// WALCommand represents a command to list WAL files for a database.
type WALCommand struct{}
// Run executes the command.
func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError)
@@ -30,37 +31,42 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
var infos []*litestream.WALInfo
// Filter by replica, if specified.
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.WALs(ctx); err != nil {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
} else {
return errors.New("config path or replica URL required")
}
// Find WAL files by db or replica.
var infos []*litestream.WALInfo
if r != nil {
if infos, err = r.WALs(ctx); err != nil {
return err
}
} else {
@@ -91,13 +97,16 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *WALCommand) Usage() {
fmt.Printf(`
The wal command lists all wal files available for a database.
Usage:
litestream wal [arguments] DB
litestream wal [arguments] DB_PATH
litestream wal [arguments] REPLICA_URL
Arguments:
@@ -111,16 +120,18 @@ Arguments:
-generation NAME
Optional, filter by a specific generation.
Examples:
# List all WAL files for a database.
$ litestream wal /path/to/db
# List all WAL files on S3 for a specific generation.
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db
$ litestream wal -replica s3 -generation xxxxxxxx /path/to/db
# List all WAL files for replica URL.
$ litestream wal s3://mybkt/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

610
db.go

File diff suppressed because it is too large Load Diff

647
db_test.go Normal file
View File

@@ -0,0 +1,647 @@
package litestream_test
import (
"database/sql"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/benbjohnson/litestream"
)
func TestDB_Path(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.Path(), `/tmp/db`; got != want {
t.Fatalf("Path()=%v, want %v", got, want)
}
}
func TestDB_WALPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.WALPath(), `/tmp/db-wal`; got != want {
t.Fatalf("WALPath()=%v, want %v", got, want)
}
}
func TestDB_MetaPath(t *testing.T) {
t.Run("Absolute", func(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.MetaPath(), `/tmp/.db-litestream`; got != want {
t.Fatalf("MetaPath()=%v, want %v", got, want)
}
})
t.Run("Relative", func(t *testing.T) {
db := litestream.NewDB("db")
if got, want := db.MetaPath(), `.db-litestream`; got != want {
t.Fatalf("MetaPath()=%v, want %v", got, want)
}
})
}
func TestDB_GenerationNamePath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.GenerationNamePath(), `/tmp/.db-litestream/generation`; got != want {
t.Fatalf("GenerationNamePath()=%v, want %v", got, want)
}
}
func TestDB_GenerationPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.GenerationPath("xxxx"), `/tmp/.db-litestream/generations/xxxx`; got != want {
t.Fatalf("GenerationPath()=%v, want %v", got, want)
}
}
func TestDB_ShadowWALDir(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.ShadowWALDir("xxxx"), `/tmp/.db-litestream/generations/xxxx/wal`; got != want {
t.Fatalf("ShadowWALDir()=%v, want %v", got, want)
}
}
func TestDB_ShadowWALPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.ShadowWALPath("xxxx", 1000), `/tmp/.db-litestream/generations/xxxx/wal/000003e8.wal`; got != want {
t.Fatalf("ShadowWALPath()=%v, want %v", got, want)
}
}
// Ensure we can check the last modified time of the real database and its WAL.
func TestDB_UpdatedAt(t *testing.T) {
t.Run("ErrNotExist", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if _, err := db.UpdatedAt(); !os.IsNotExist(err) {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("DB", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if t0, err := db.UpdatedAt(); err != nil {
t.Fatal(err)
} else if time.Since(t0) > 10*time.Second {
t.Fatalf("unexpected updated at time: %s", t0)
}
})
t.Run("WAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
t0, err := db.UpdatedAt()
if err != nil {
t.Fatal(err)
}
if os.Getenv("CI") != "" {
time.Sleep(1 * time.Second)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
t.Fatal(err)
}
if t1, err := db.UpdatedAt(); err != nil {
t.Fatal(err)
} else if !t1.After(t0) {
t.Fatalf("expected newer updated at time: %s > %s", t1, t0)
}
})
}
// Ensure we can compute a checksum on the real database.
func TestDB_CRC64(t *testing.T) {
t.Run("ErrNotExist", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if _, _, err := db.CRC64(); !os.IsNotExist(err) {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("DB", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
chksum0, _, err := db.CRC64()
if err != nil {
t.Fatal(err)
}
// Issue change that is applied to the WAL. Checksum should not change.
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
t.Fatal(err)
} else if chksum1, _, err := db.CRC64(); err != nil {
t.Fatal(err)
} else if chksum0 == chksum1 {
t.Fatal("expected different checksum event after WAL change")
}
// Checkpoint change into database. Checksum should change.
if _, err := sqldb.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
t.Fatal(err)
}
if chksum2, _, err := db.CRC64(); err != nil {
t.Fatal(err)
} else if chksum0 == chksum2 {
t.Fatal("expected different checksums after checkpoint")
}
})
}
// Ensure we can sync the real WAL to the shadow WAL.
func TestDB_Sync(t *testing.T) {
// Ensure sync is skipped if no database exists.
t.Run("NoDB", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
})
// Ensure sync can successfully run on the initial sync.
t.Run("Initial", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify page size if now available.
if db.PageSize() == 0 {
t.Fatal("expected page size after initial sync")
}
// Obtain real WAL size.
fi, err := os.Stat(db.WALPath())
if err != nil {
t.Fatal(err)
}
// Ensure position now available.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos.Generation == "" {
t.Fatal("expected generation")
} else if got, want := pos.Index, 0; got != want {
t.Fatalf("pos.Index=%v, want %v", got, want)
} else if got, want := pos.Offset, fi.Size(); got != want {
t.Fatalf("pos.Offset=%v, want %v", got, want)
}
})
// Ensure DB can keep in sync across multiple Sync() invocations.
t.Run("MultiSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Perform initial sync & grab initial position.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Insert into table.
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
// Sync to ensure position moves forward one page.
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation != pos1.Generation {
t.Fatal("expected the same generation")
} else if got, want := pos1.Index, pos0.Index; got != want {
t.Fatalf("Index=%v, want %v", got, want)
} else if got, want := pos1.Offset, pos0.Offset+4096+litestream.WALFrameHeaderSize; got != want {
t.Fatalf("Offset=%v, want %v", got, want)
}
})
// Ensure a WAL file is created if one does not already exist.
t.Run("NoWAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Issue initial sync and truncate WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Checkpoint & fully close which should close WAL file.
if err := db.Checkpoint(litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
}
// Verify WAL does not exist.
if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
t.Fatal(err)
}
// Reopen the managed database.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
// Re-sync and ensure new generation has been created.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation after truncation")
}
})
// Ensure DB can start new generation if it detects it cannot verify last position.
t.Run("OverwritePrevPosition", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Issue initial sync and truncate WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Fully close which should close WAL file.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
}
// Verify WAL does not exist.
if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
t.Fatal(err)
}
// Insert into table multiple times to move past old offset
sqldb = MustOpenSQLDB(t, db.Path())
defer MustCloseSQLDB(t, sqldb)
for i := 0; i < 100; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
}
// Reopen the managed database.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
// Re-sync and ensure new generation has been created.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation after truncation")
}
})
// Ensure DB can handle a mismatched header-only and start new generation.
t.Run("WALHeaderMismatch", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Grab initial position & close.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
t.Fatal(err)
}
// Read existing file, update header checksum, and write back only header
// to simulate a header with a mismatched checksum.
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := ioutil.ReadFile(shadowWALPath); err != nil {
t.Fatal(err)
} else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify a new generation was started.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
}
})
// Ensure DB can handle partial shadow WAL header write.
t.Run("PartialShadowWALHeader", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Close & truncate shadow WAL to simulate a partial header write.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), litestream.WALHeaderSize-1); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify a new generation was started.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
}
})
// Ensure DB can handle partial shadow WAL writes.
t.Run("PartialShadowWALFrame", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Obtain current shadow WAL size.
fi, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index))
if err != nil {
t.Fatal(err)
}
// Close & truncate shadow WAL to simulate a partial frame write.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), fi.Size()-1); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify same generation is kept.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos1, pos0; got != want {
t.Fatalf("Pos()=%s want %s", got, want)
}
// Ensure shadow WAL has recovered.
if fi0, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
t.Fatal(err)
} else if got, want := fi0.Size(), fi.Size(); got != want {
t.Fatalf("Size()=%v, want %v", got, want)
}
})
// Ensure DB can handle a generation directory with a missing shadow WAL.
t.Run("NoShadowWAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Close & delete shadow WAL to simulate dir created but not WAL.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Remove(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify new generation created but index/offset the same.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
} else if got, want := pos1.Index, pos0.Index; got != want {
t.Fatalf("Index=%v want %v", got, want)
} else if got, want := pos1.Offset, pos0.Offset; got != want {
t.Fatalf("Offset=%v want %v", got, want)
}
})
// Ensure DB checkpoints after minimum number of pages.
t.Run("MinCheckpointPageN", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Write at least minimum number of pages to trigger rollover.
for i := 0; i < db.MinCheckpointPageN; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
}
// Sync to shadow WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
// Ensure DB checkpoints after interval.
t.Run("CheckpointInterval", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Reduce checkpoint interval to ensure a rollover is triggered.
db.CheckpointInterval = 1 * time.Nanosecond
// Write to WAL & sync.
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
}
// MustOpenDBs returns a new instance of a DB & associated SQL DB.
func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) {
db := MustOpenDB(tb)
return db, MustOpenSQLDB(tb, db.Path())
}
// MustCloseDBs closes db & sqldb and removes the parent directory.
func MustCloseDBs(tb testing.TB, db *litestream.DB, sqldb *sql.DB) {
MustCloseDB(tb, db)
MustCloseSQLDB(tb, sqldb)
}
// MustOpenDB returns a new instance of a DB.
func MustOpenDB(tb testing.TB) *litestream.DB {
dir := tb.TempDir()
return MustOpenDBAt(tb, filepath.Join(dir, "db"))
}
// MustOpenDBAt returns a new instance of a DB for a given path.
func MustOpenDBAt(tb testing.TB, path string) *litestream.DB {
tb.Helper()
db := litestream.NewDB(path)
db.MonitorInterval = 0 // disable background goroutine
if err := db.Open(); err != nil {
tb.Fatal(err)
}
return db
}
// MustCloseDB closes db and removes its parent directory.
func MustCloseDB(tb testing.TB, db *litestream.DB) {
tb.Helper()
if err := db.Close(); err != nil {
tb.Fatal(err)
} else if err := os.RemoveAll(filepath.Dir(db.Path())); err != nil {
tb.Fatal(err)
}
}
// MustOpenSQLDB returns a database/sql DB.
func MustOpenSQLDB(tb testing.TB, path string) *sql.DB {
tb.Helper()
d, err := sql.Open("sqlite3", path)
if err != nil {
tb.Fatal(err)
} else if _, err := d.Exec(`PRAGMA journal_mode = wal;`); err != nil {
tb.Fatal(err)
}
return d
}
// MustCloseSQLDB closes a database/sql DB.
func MustCloseSQLDB(tb testing.TB, d *sql.DB) {
tb.Helper()
if err := d.Close(); err != nil {
tb.Fatal(err)
}
}

View File

@@ -1,88 +0,0 @@
DESIGN
======
Litestream is a sidecar process that replicates the write ahead log (WAL) for
a SQLite database. To ensure that it can replicate every page, litestream takes
control over the checkpointing process by issuing a long running read
transaction against the database to prevent checkpointing. It then releases
this transaction once it obtains a write lock and issues the checkpoint itself.
The daemon polls the database on an interval to breifly obtain a write
transaction lock and copy over new WAL pages. Once the WAL has reached a
threshold size, litestream will issue a checkpoint and a single page write
to a table called `_litestream` to start the new WAL.
## Workflow
When litestream first loads a database, it checks if there is an existing
sidecar directory which is named `.<DB>-litestream`. If not, it initializes
the directory and starts a new generation.
A generation is a snapshot of the database followed by a continuous stream of
WAL files. A new generation is started on initialization & whenever litestream
cannot verify that it has a continuous record of WAL files. This could happen
if litestream is stopped and another process checkpoints the WAL. In this case,
a new generation ID is randomly created and a snapshot is replicated to the
appropriate destinations.
Generations also prevent two servers from replicating to the same destination
and corrupting each other's data. In this case, each server would replicate
to a different generation directory. On recovery, there will be duplicate
databases and the end user can choose which generation to recover but each
database will be uncorrupted.
## File Layout
Litestream maintains a shadow WAL which is a historical record of all previous
WAL files. These files can be deleted after a time or size threshold but should
be replicated before being deleted.
### Local
Given a database file named `db`, SQLite will create a WAL file called `db-wal`.
Litestream will then create a hidden directory called `.db-litestream` that
contains the historical record of all WAL files for the current generation.
```
db # SQLite database
db-wal # SQLite WAL
.db-litestream/
generation # current generation number
generations/
xxxxxxxx/
wal/ # WAL files
000000000000001.wal
000000000000002.wal
000000000000003.wal # active WAL
```
### Remote (S3)
```
bkt/
db/ # database path
generations/
xxxxxxxx/
snapshots/ # snapshots w/ timestamp+offset
20000101T000000Z-000000000000023.snapshot
wal/ # compressed WAL files
000000000000001-0.wal.gz
000000000000001-<offset>.wal.gz
000000000000002-0.wal.gz
00000002/
snapshot/
000000000000000.snapshot
scheduled/
daily/
20000101T000000Z-000000000000023.snapshot
20000102T000000Z-000000000000036.snapshot
monthly/
20000101T000000Z-000000000000023.snapshot
wal/
000000000000001.wal.gz
```

15
etc/gon.hcl Normal file
View File

@@ -0,0 +1,15 @@
source = ["./dist/litestream"]
bundle_id = "com.middlemost.litestream"
apple_id {
username = "benbjohnson@yahoo.com"
password = "@env:AC_PASSWORD"
}
sign {
application_identity = "Developer ID Application: Middlemost Systems, LLC"
}
zip {
output_path = "dist/litestream.zip"
}

9
etc/litestream.service Normal file
View File

@@ -0,0 +1,9 @@
[Unit]
Description=Litestream
[Service]
Restart=always
ExecStart=/usr/bin/litestream replicate
[Install]
WantedBy=multi-user.target

10
etc/litestream.yml Normal file
View File

@@ -0,0 +1,10 @@
# AWS credentials
# access-key-id: AKIAxxxxxxxxxxxxxxxx
# secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
# dbs:
# - path: /path/to/primary/db # Database to replicate from
# replicas:
# - path: /path/to/replica # File-based replication
# - path: s3://my.bucket.com/db # S3-based replication

19
etc/nfpm.yml Normal file
View File

@@ -0,0 +1,19 @@
name: litestream
arch: amd64
platform: linux
version: "${LITESTREAM_VERSION}"
section: "default"
priority: "extra"
maintainer: "Ben Johnson <benbjohnson@yahoo.com>"
description: Litestream is a tool for real-time replication of SQLite databases.
homepage: "https://github.com/benbjohnson/litestream"
license: "GPLv3"
contents:
- src: ./litestream
dst: /usr/bin/litestream
- src: ./litestream.yml
dst: /etc/litestream.yml
type: config
- src: ./litestream.service
dst: /usr/lib/systemd/system/litestream.service
type: config

3
go.mod
View File

@@ -3,7 +3,10 @@ module github.com/benbjohnson/litestream
go 1.15
require (
github.com/aws/aws-sdk-go v1.27.0
github.com/davecgh/go-spew v1.1.1
github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3
github.com/prometheus/client_golang v1.9.0
gopkg.in/yaml.v2 v2.4.0
)

7
go.sum
View File

@@ -18,6 +18,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0 h1:0xphMHGMLBrPMfxR2AmVjZKcMEESEgWF8Kru94BNByk=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -40,6 +41,7 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@@ -124,6 +126,7 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -194,7 +197,11 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4/v4 v4.1.3 h1:/dvQpkb0o1pVlSgKNQqfkavlnXaIK+hJ0LXsKRUN9D4=
github.com/pierrec/lz4/v4 v4.1.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

32
internal/internal.go Normal file
View File

@@ -0,0 +1,32 @@
package internal
import (
"io"
)
// ReadCloser wraps a reader to also attach a separate closer.
type ReadCloser struct {
r io.Reader
c io.Closer
}
// NewReadCloser returns a new instance of ReadCloser.
func NewReadCloser(r io.Reader, c io.Closer) *ReadCloser {
return &ReadCloser{r, c}
}
// Read reads bytes into the underlying reader.
func (r *ReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
// Close closes the reader (if implementing io.ReadCloser) and the Closer.
func (r *ReadCloser) Close() error {
if rc, ok := r.r.(io.Closer); ok {
if err := rc.Close(); err != nil {
r.c.Close()
return err
}
}
return r.c.Close()
}

44
internal/metrics.go Normal file
View File

@@ -0,0 +1,44 @@
package internal
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// Shared replica metrics.
var (
ReplicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "snapshot_total",
Help: "The current number of snapshots",
}, []string{"db", "name"})
ReplicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_bytes",
Help: "The number wal bytes written",
}, []string{"db", "name"})
ReplicaWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_index",
Help: "The current WAL index",
}, []string{"db", "name"})
ReplicaWALOffsetGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_offset",
Help: "The current WAL offset",
}, []string{"db", "name"})
ReplicaValidationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "validation_total",
Help: "The number of validations performed",
}, []string{"db", "name", "status"})
)

View File

@@ -1,10 +1,8 @@
package litestream
import (
"compress/gzip"
"database/sql"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
@@ -13,11 +11,11 @@ import (
"regexp"
"strconv"
"strings"
"syscall"
"time"
_ "github.com/mattn/go-sqlite3"
)
// Naming constants.
const (
MetaDirSuffix = "-litestream"
@@ -52,6 +50,30 @@ type SnapshotInfo struct {
CreatedAt time.Time
}
// FilterSnapshotsAfter returns all snapshots that were created on or after t.
func FilterSnapshotsAfter(a []*SnapshotInfo, t time.Time) []*SnapshotInfo {
other := make([]*SnapshotInfo, 0, len(a))
for _, snapshot := range a {
if !snapshot.CreatedAt.Before(t) {
other = append(other, snapshot)
}
}
return other
}
// FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
func FindMinSnapshotByGeneration(a []*SnapshotInfo, generation string) *SnapshotInfo {
var min *SnapshotInfo
for _, snapshot := range a {
if snapshot.Generation != generation {
continue
} else if min == nil || snapshot.Index < min.Index {
min = snapshot
}
}
return min
}
// WALInfo represents file information about a WAL file.
type WALInfo struct {
Name string
@@ -75,7 +97,7 @@ func (p Pos) String() string {
if p.IsZero() {
return "<>"
}
return fmt.Sprintf("<%s,%d,%d>", p.Generation, p.Index, p.Offset)
return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset)
}
// IsZero returns true if p is the zero value.
@@ -129,10 +151,6 @@ func readWALHeader(filename string) ([]byte, error) {
return buf[:n], err
}
func readCheckpointSeqNo(hdr []byte) uint32 {
return binary.BigEndian.Uint32(hdr[12:])
}
// readFileAt reads a slice from a file.
func readFileAt(filename string, offset, n int64) ([]byte, error) {
f, err := os.Open(filename)
@@ -184,19 +202,19 @@ func IsSnapshotPath(s string) bool {
// ParseSnapshotPath returns the index for the snapshot.
// Returns an error if the path is not a valid snapshot path.
func ParseSnapshotPath(s string) (index int, typ, ext string, err error) {
func ParseSnapshotPath(s string) (index int, ext string, err error) {
s = filepath.Base(s)
a := snapshotPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s)
return 0, "", fmt.Errorf("invalid snapshot path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), a[2], a[3], nil
return int(i64), a[2], nil
}
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:-(\w+))?(.snapshot(?:.gz)?)$`)
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.lz4)?)$`)
// IsWALPath returns true if s is a path to a WAL file.
func IsWALPath(s string) bool {
@@ -221,75 +239,82 @@ func ParseWALPath(s string) (index int, offset int64, ext string, err error) {
// FormatWALPath formats a WAL filename with a given index.
func FormatWALPath(index int) string {
assert(index >= 0, "wal index must be non-negative")
return fmt.Sprintf("%016x%s", index, WALExt)
return fmt.Sprintf("%08x%s", index, WALExt)
}
// FormatWALPathWithOffset formats a WAL filename with a given index & offset.
func FormatWALPathWithOffset(index int, offset int64) string {
assert(index >= 0, "wal index must be non-negative")
assert(offset >= 0, "wal offset must be non-negative")
return fmt.Sprintf("%016x_%016x%s", index, offset, WALExt)
return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt)
}
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`)
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.lz4)?)$`)
// isHexChar returns true if ch is a lowercase hex character.
func isHexChar(ch rune) bool {
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
}
// gzipReadCloser wraps gzip.Reader to also close the underlying reader on close.
type gzipReadCloser struct {
r *gzip.Reader
closer io.ReadCloser
// createFile creates the file and attempts to set the UID/GID.
func createFile(filename string, perm os.FileMode, uid, gid int) (*os.File, error) {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return nil, err
}
_ = f.Chown(uid, gid)
return f, nil
}
func (r *gzipReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
// mkdirAll is a copy of os.MkdirAll() except that it attempts to set the
// uid/gid for each created directory.
func mkdirAll(path string, perm os.FileMode, uid, gid int) error {
// Fast path: if we can tell whether path is a directory or file, stop with success or error.
dir, err := os.Stat(path)
if err == nil {
if dir.IsDir() {
return nil
}
return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
}
func (r *gzipReadCloser) Close() error {
if err := r.r.Close(); err != nil {
r.closer.Close()
// Slow path: make sure parent exists and then call Mkdir for path.
i := len(path)
for i > 0 && os.IsPathSeparator(path[i-1]) { // Skip trailing path separator.
i--
}
j := i
for j > 0 && !os.IsPathSeparator(path[j-1]) { // Scan backward over element.
j--
}
if j > 1 {
// Create parent.
err = mkdirAll(fixRootDirectory(path[:j-1]), perm, uid, gid)
if err != nil {
return err
}
return r.closer.Close()
}
// HexDump returns hexdump output but with duplicate lines removed.
func HexDump(b []byte) string {
const prefixN = len("00000000")
var output []string
var prev string
var ellipsis bool
lines := strings.Split(strings.TrimSpace(hex.Dump(b)), "\n")
for i, line := range lines {
// Add line to output if it is not repeating or the last line.
if i == 0 || i == len(lines)-1 || trimPrefixN(line, prefixN) != trimPrefixN(prev, prefixN) {
output = append(output, line)
prev, ellipsis = line, false
continue
// Parent now exists; invoke Mkdir and use its result.
err = os.Mkdir(path, perm)
if err != nil {
// Handle arguments like "foo/." by
// double-checking that directory doesn't exist.
dir, err1 := os.Lstat(path)
if err1 == nil && dir.IsDir() {
_ = os.Chown(path, uid, gid)
return nil
}
return err
}
_ = os.Chown(path, uid, gid)
return nil
}
// Add an ellipsis for the first duplicate line.
if !ellipsis {
output = append(output, "...")
ellipsis = true
continue
}
}
return strings.Join(output, "\n")
}
func trimPrefixN(s string, n int) string {
if len(s) < n {
return ""
}
return s[n:]
}
// Tracef is used for low-level tracing.
var Tracef = func(format string, a ...interface{}) {}
func assert(condition bool, message string) {
if !condition {

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/benbjohnson/litestream"
_ "github.com/mattn/go-sqlite3"
)
func TestChecksum(t *testing.T) {

18
litestream_unix.go Normal file
View File

@@ -0,0 +1,18 @@
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris
package litestream
import (
"os"
"syscall"
)
// fileinfo returns syscall fields from a FileInfo object.
func fileinfo(fi os.FileInfo) (uid, gid int) {
stat := fi.Sys().(*syscall.Stat_t)
return int(stat.Uid), int(stat.Gid)
}
func fixRootDirectory(p string) string {
return p
}

23
litestream_windows.go Normal file
View File

@@ -0,0 +1,23 @@
// +build windows
package litestream
import (
"os"
"syscall"
)
// fileinfo returns syscall fields from a FileInfo object.
func fileinfo(fi os.FileInfo) (uid, gid int) {
return -1, -1
}
// fixRootDirectory is copied from the standard library for use with mkdirAll()
func fixRootDirectory(p string) string {
if len(p) == len(`\\?\c:`) {
if IsPathSeparator(p[0]) && IsPathSeparator(p[1]) && p[2] == '?' && IsPathSeparator(p[3]) && p[5] == ':' {
return p + `\`
}
}
return p
}

View File

@@ -1,7 +1,6 @@
package litestream
import (
"compress/gzip"
"context"
"fmt"
"io"
@@ -11,9 +10,12 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
"github.com/benbjohnson/litestream/internal"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/client_golang/prometheus"
)
// Replica represents a remote destination to replicate the database & WAL.
@@ -24,6 +26,9 @@ type Replica interface {
// String identifier for the type of replica ("file", "s3", etc).
Type() string
// The parent database.
DB() *DB
// Starts replicating in a background goroutine.
Start(ctx context.Context)
@@ -34,7 +39,7 @@ type Replica interface {
LastPos() Pos
// Returns the computed position of the replica for a given generation.
CalcPos(generation string) (Pos, error)
CalcPos(ctx context.Context, generation string) (Pos, error)
// Returns a list of generation names for the replica.
Generations(ctx context.Context) ([]string, error)
@@ -49,14 +54,6 @@ type Replica interface {
// Returns a list of available WAL files in the replica.
WALs(ctx context.Context) ([]*WALInfo, error)
// Returns the highest index for a snapshot within a generation that occurs
// before timestamp. If timestamp is zero, returns the latest snapshot.
SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
// Returns the highest index for a WAL file that occurs before timestamp.
// If timestamp is zero, returns the highest WAL index.
WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error)
// Returns a reader for snapshot data at the given generation/index.
SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
@@ -75,6 +72,12 @@ type GenerationStats struct {
UpdatedAt time.Time
}
// Default file replica settings.
const (
DefaultRetention = 24 * time.Hour
DefaultRetentionCheckInterval = 1 * time.Hour
)
var _ Replica = (*FileReplica)(nil)
// FileReplica is a replica that replicates a DB to a local file path.
@@ -87,18 +90,51 @@ type FileReplica struct {
pos Pos // last position
wg sync.WaitGroup
ctx context.Context
cancel func()
snapshotTotalGauge prometheus.Gauge
walBytesCounter prometheus.Counter
walIndexGauge prometheus.Gauge
walOffsetGauge prometheus.Gauge
// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded.
Retention time.Duration
// Time between checks for retention.
RetentionCheckInterval time.Duration
// Time between validation checks.
ValidationInterval time.Duration
// If true, replica monitors database for changes automatically.
// Set to false if replica is being used synchronously (such as in tests).
MonitorEnabled bool
}
// NewFileReplica returns a new instance of FileReplica.
func NewFileReplica(db *DB, name, dst string) *FileReplica {
return &FileReplica{
r := &FileReplica{
db: db,
name: name,
dst: dst,
cancel: func() {},
Retention: DefaultRetention,
RetentionCheckInterval: DefaultRetentionCheckInterval,
MonitorEnabled: true,
}
var dbPath string
if db != nil {
dbPath = db.Path()
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
return r
}
// Name returns the name of the replica. Returns the type if no name set.
@@ -114,6 +150,16 @@ func (r *FileReplica) Type() string {
return "file"
}
// DB returns the parent database reference.
func (r *FileReplica) DB() *DB {
return r.db
}
// Path returns the path the replica was initialized with.
func (r *FileReplica) Path() string {
return r.dst
}
// LastPos returns the last successfully replicated position.
func (r *FileReplica) LastPos() Pos {
r.mu.RLock()
@@ -121,14 +167,19 @@ func (r *FileReplica) LastPos() Pos {
return r.pos
}
// GenerationDir returns the path to a generation's root directory.
func (r *FileReplica) GenerationDir(generation string) string {
return filepath.Join(r.dst, "generations", generation)
}
// SnapshotDir returns the path to a generation's snapshot directory.
func (r *FileReplica) SnapshotDir(generation string) string {
return filepath.Join(r.dst, "generations", generation, "snapshots")
return filepath.Join(r.GenerationDir(generation), "snapshots")
}
// SnapshotPath returns the path to a snapshot file.
func (r *FileReplica) SnapshotPath(generation string, index int) string {
return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index))
return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.lz4", index))
}
// MaxSnapshotIndex returns the highest index for the snapshots.
@@ -140,7 +191,7 @@ func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) {
index := -1
for _, fi := range fis {
if idx, _, _, err := ParseSnapshotPath(fi.Name()); err != nil {
if idx, _, err := ParseSnapshotPath(fi.Name()); err != nil {
continue
} else if index == -1 || idx > index {
index = idx
@@ -154,55 +205,12 @@ func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) {
// WALDir returns the path to a generation's WAL directory
func (r *FileReplica) WALDir(generation string) string {
return filepath.Join(r.dst, "generations", generation, "wal")
}
// WALSubdir returns the directory used for grouping WAL files.
func (r *FileReplica) WALSubdir(generation string, index int) string {
return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x", uint64(index)&walDirMask))
}
// WALSubdirNames returns a list of all WAL subdirectory group names.
func (r *FileReplica) WALSubdirNames(generation string) ([]string, error) {
fis, err := ioutil.ReadDir(r.WALDir(generation))
if err != nil && !os.IsNotExist(err) {
return nil, err
}
var names []string
for _, fi := range fis {
if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil {
continue
}
names = append(names, fi.Name())
}
return names, nil
}
// MaxWALSubdirName returns the highest WAL subdirectory group name.
func (r *FileReplica) MaxWALSubdirName(generation string) (string, error) {
fis, err := ioutil.ReadDir(r.WALDir(generation))
if err != nil {
return "", err
}
var name string
for _, fi := range fis {
if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil {
continue
} else if name == "" || fi.Name() > name {
name = fi.Name()
}
}
if name == "" {
return "", os.ErrNotExist
}
return name, nil
return filepath.Join(r.GenerationDir(generation), "wal")
}
// WALPath returns the path to a WAL file.
func (r *FileReplica) WALPath(generation string, index int) string {
return filepath.Join(r.WALSubdir(generation, index), fmt.Sprintf("%016x.wal", index))
return filepath.Join(r.WALDir(generation), fmt.Sprintf("%08x.wal", index))
}
// Generations returns a list of available generation names.
@@ -280,13 +288,7 @@ func (r *FileReplica) snapshotStats(generation string) (n int, min, max time.Tim
}
func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) {
names, err := r.WALSubdirNames(generation)
if err != nil {
return n, min, max, err
}
for _, name := range names {
fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name))
fis, err := ioutil.ReadDir(r.WALDir(generation))
if os.IsNotExist(err) {
return n, min, max, nil
} else if err != nil {
@@ -307,7 +309,6 @@ func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, er
max = modTime
}
}
}
return n, min, max, nil
}
@@ -328,13 +329,11 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
}
for _, fi := range fis {
index, _, _, err := ParseSnapshotPath(fi.Name())
index, _, err := ParseSnapshotPath(fi.Name())
if err != nil {
continue
}
// TODO: Add schedule name to snapshot info.
infos = append(infos, &SnapshotInfo{
Name: fi.Name(),
Replica: r.Name(),
@@ -358,23 +357,8 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
var infos []*WALInfo
for _, generation := range generations {
// Find a list of all directory groups.
dir := r.WALDir(generation)
subfis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) {
continue
} else if err != nil {
return nil, err
}
// Iterate over WAL group subdirectories.
for _, subfi := range subfis {
if !subfi.IsDir() {
continue
}
// Find a list of all WAL files in the group.
fis, err := ioutil.ReadDir(filepath.Join(dir, subfi.Name()))
// Find a list of all WAL files.
fis, err := ioutil.ReadDir(r.WALDir(generation))
if os.IsNotExist(err) {
continue
} else if err != nil {
@@ -399,13 +383,17 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
})
}
}
}
return infos, nil
}
// Start starts replication for a given generation.
func (r *FileReplica) Start(ctx context.Context) {
// Ignore if replica is being used sychronously.
if !r.MonitorEnabled {
return
}
// Stop previous replication.
r.Stop()
@@ -413,8 +401,10 @@ func (r *FileReplica) Start(ctx context.Context) {
ctx, r.cancel = context.WithCancel(ctx)
// Start goroutine to replicate data.
r.wg.Add(1)
r.wg.Add(3)
go func() { defer r.wg.Done(); r.monitor(ctx) }()
go func() { defer r.wg.Done(); r.retainer(ctx) }()
go func() { defer r.wg.Done(); r.validator(ctx) }()
}
// Stop cancels any outstanding replication and blocks until finished.
@@ -453,9 +443,55 @@ func (r *FileReplica) monitor(ctx context.Context) {
}
}
// retainer runs in a separate goroutine and handles retention.
func (r *FileReplica) retainer(ctx context.Context) {
ticker := time.NewTicker(r.RetentionCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil {
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// validator runs in a separate goroutine and handles periodic validation.
func (r *FileReplica) validator(ctx context.Context) {
// Initialize counters since validation occurs infrequently.
for _, status := range []string{"ok", "error"} {
internal.ReplicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), status).Add(0)
}
// Exit validation if interval is not set.
if r.ValidationInterval <= 0 {
return
}
ticker := time.NewTicker(r.ValidationInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := ValidateReplica(ctx, r); err != nil {
log.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// CalcPos returns the position for the replica for the current generation.
// Returns a zero value if there is no active generation.
func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) {
func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, err error) {
pos.Generation = generation
// Find maximum snapshot index.
@@ -463,16 +499,8 @@ func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) {
return Pos{}, err
}
// Find highest WAL subdirectory group.
subdir, err := r.MaxWALSubdirName(generation)
if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at snapshot index
} else if err != nil {
return Pos{}, err
}
// Find the max WAL file within WAL group.
fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), subdir))
// Find the max WAL file within WAL.
fis, err := ioutil.ReadDir(r.WALDir(generation))
if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at snapshot index.
} else if err != nil {
@@ -509,10 +537,10 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
if err != nil {
return err
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
tx.Rollback()
_ = tx.Rollback()
return err
}
defer tx.Rollback()
defer func() { _ = tx.Rollback() }()
// Ignore if we already have a snapshot for the given WAL index.
snapshotPath := r.SnapshotPath(generation, index)
@@ -520,11 +548,11 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
return nil
}
if err := os.MkdirAll(filepath.Dir(snapshotPath), 0700); err != nil {
if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
return err
}
return compressFile(r.db.Path(), snapshotPath)
return compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid)
}
// snapshotN returns the number of snapshots for a generation.
@@ -538,14 +566,24 @@ func (r *FileReplica) snapshotN(generation string) (int, error) {
var n int
for _, fi := range fis {
if _, _, _, err := ParseSnapshotPath(fi.Name()); err == nil {
if _, _, err := ParseSnapshotPath(fi.Name()); err == nil {
n++
}
}
return n, nil
}
// Sync replays data from the shadow WAL into the file replica.
func (r *FileReplica) Sync(ctx context.Context) (err error) {
// Clear last position if if an error occurs during sync.
defer func() {
if err != nil {
r.mu.Lock()
r.pos = Pos{}
r.mu.Unlock()
}
}()
// Find current position of database.
dpos, err := r.db.Pos()
if err != nil {
@@ -562,11 +600,14 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
return err
}
r.snapshotTotalGauge.Set(1.0)
} else {
r.snapshotTotalGauge.Set(float64(n))
}
// Determine position, if necessary.
if r.LastPos().IsZero() {
pos, err := r.CalcPos(generation)
if r.LastPos().Generation != generation {
pos, err := r.CalcPos(ctx, generation)
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", err)
}
@@ -585,7 +626,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
}
}
// Gzip any old WAL files.
// Compress any old WAL files.
if generation != "" {
if err := r.compress(ctx, generation); err != nil {
return fmt.Errorf("cannot compress: %s", err)
@@ -606,23 +647,30 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
// Ensure parent directory exists for WAL file.
filename := r.WALPath(rd.Pos().Generation, rd.Pos().Index)
if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil {
if err := mkdirAll(filepath.Dir(filename), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
return err
}
// Create a temporary file to write into so we don't have partial writes.
w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600)
w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, r.db.mode)
if err != nil {
return err
}
defer w.Close()
_ = os.Chown(filename, r.db.uid, r.db.gid)
// Seek, copy & sync WAL contents.
if _, err := w.Seek(rd.Pos().Offset, io.SeekStart); err != nil {
return err
} else if _, err := io.Copy(w, rd); err != nil {
}
n, err := io.Copy(w, rd)
r.walBytesCounter.Add(float64(n))
if err != nil {
return err
} else if err := w.Sync(); err != nil {
}
if err := w.Sync(); err != nil {
return err
} else if err := w.Close(); err != nil {
return err
@@ -633,12 +681,16 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
r.pos = rd.Pos()
r.mu.Unlock()
// Track current position
r.walIndexGauge.Set(float64(rd.Pos().Index))
r.walOffsetGauge.Set(float64(rd.Pos().Offset))
return nil
}
// compress gzips all WAL files before the current one.
// compress compresses all WAL files before the current one.
func (r *FileReplica) compress(ctx context.Context, generation string) error {
filenames, err := filepath.Glob(filepath.Join(r.WALDir(generation), "**/*.wal"))
filenames, err := filepath.Glob(filepath.Join(r.WALDir(generation), "*.wal"))
if err != nil {
return err
} else if len(filenames) <= 1 {
@@ -657,8 +709,8 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error {
default:
}
dst := filename + ".gz"
if err := compressFile(filename, dst); err != nil {
dst := filename + ".lz4"
if err := compressFile(filename, dst, r.db.uid, r.db.gid); err != nil {
return err
} else if err := os.Remove(filename); err != nil {
return err
@@ -668,84 +720,6 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error {
return nil
}
// SnapsotIndexAt returns the highest index for a snapshot within a generation
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) {
fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if os.IsNotExist(err) {
return 0, ErrNoSnapshots
} else if err != nil {
return 0, err
}
index := -1
var max time.Time
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, _, err := ParseSnapshotPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
}
// Use snapshot if it newer.
if max.IsZero() || fi.ModTime().After(max) {
index, max = idx, fi.ModTime()
}
}
if index == -1 {
return 0, ErrNoSnapshots
}
return index, nil
}
// Returns the highest index for a WAL file that occurs before maxIndex & timestamp.
// If timestamp is zero, returns the highest WAL index.
func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) {
names, err := r.WALSubdirNames(generation)
if err != nil {
return 0, err
}
// TODO: Optimize to only read the last group if no timestamp specified.
// TODO: Perform binary search to find correct timestamp.
var index int
for _, name := range names {
fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name))
if os.IsNotExist(err) {
return 0, nil
} else if err != nil {
return 0, err
}
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, _, err := ParseWALPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
} else if idx > maxIndex {
continue // after timestamp, skip
} else if idx < index {
continue // earlier index, skip
}
index = idx
}
}
// If max index is specified but not found, return an error.
if maxIndex != math.MaxInt64 && index != maxIndex {
return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index)
}
return index, nil
}
// SnapshotReader returns a reader for snapshot data at the given generation/index.
// Returns os.ErrNotExist if no matching index is found.
func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
@@ -757,7 +731,7 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind
for _, fi := range fis {
// Parse index from snapshot filename. Skip if no match.
idx, _, ext, err := ParseSnapshotPath(fi.Name())
idx, ext, err := ParseSnapshotPath(fi.Name())
if err != nil || index != idx {
continue
}
@@ -769,16 +743,11 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind
} else if ext == ".snapshot" {
return f, nil // not compressed, return as-is.
}
assert(ext == ".snapshot.gz", "invalid snapshot extension")
assert(ext == ".snapshot.lz4", "invalid snapshot extension")
// If compressed, wrap in a gzip reader and return with wrapper to
// If compressed, wrap in an lz4 reader and return with wrapper to
// ensure that the underlying file is closed.
r, err := gzip.NewReader(f)
if err != nil {
f.Close()
return nil, err
}
return &gzipReadCloser{r: r, closer: f}, nil
return internal.NewReadCloser(lz4.NewReader(f), f), nil
}
return nil, os.ErrNotExist
}
@@ -797,42 +766,215 @@ func (r *FileReplica) WALReader(ctx context.Context, generation string, index in
}
// Otherwise read the compressed file. Return error if file doesn't exist.
f, err = os.Open(filename + ".gz")
f, err = os.Open(filename + ".lz4")
if err != nil {
return nil, err
}
// If compressed, wrap in a gzip reader and return with wrapper to
// If compressed, wrap in an lz4 reader and return with wrapper to
// ensure that the underlying file is closed.
rd, err := gzip.NewReader(f)
if err != nil {
f.Close()
return nil, err
}
return &gzipReadCloser{r: rd, closer: f}, nil
return internal.NewReadCloser(lz4.NewReader(f), f), nil
}
// compressFile compresses a file and replaces it with a new file with a .gz extension.
func compressFile(src, dst string) error {
// EnforceRetention forces a new snapshot once the retention interval has passed.
// Older snapshots and WAL files are then removed.
func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if pos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
// Obtain list of snapshots that are within the retention period.
snapshots, err := r.Snapshots(ctx)
if err != nil {
return fmt.Errorf("cannot obtain snapshot list: %w", err)
}
snapshots = FilterSnapshotsAfter(snapshots, time.Now().Add(-r.Retention))
// If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err)
}
snapshots = append(snapshots, &SnapshotInfo{Generation: pos.Generation, Index: pos.Index})
}
// Loop over generations and delete unretained snapshots & WAL files.
generations, err := r.Generations(ctx)
if err != nil {
return fmt.Errorf("cannot obtain generations: %w", err)
}
for _, generation := range generations {
// Find earliest retained snapshot for this generation.
snapshot := FindMinSnapshotByGeneration(snapshots, generation)
// Delete generations if it has no snapshots being retained.
if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := os.RemoveAll(r.GenerationDir(generation)); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
}
continue
}
// Otherwise delete all snapshots & WAL files before a lowest retained index.
if err := r.deleteGenerationSnapshotsBefore(ctx, generation, snapshot.Index); err != nil {
return fmt.Errorf("cannot delete generation %q snapshots before index %d: %w", generation, snapshot.Index, err)
} else if err := r.deleteGenerationWALBefore(ctx, generation, snapshot.Index); err != nil {
return fmt.Errorf("cannot delete generation %q wal before index %d: %w", generation, snapshot.Index, err)
}
}
return nil
}
// deleteGenerationSnapshotsBefore deletes snapshot before a given index.
func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, generation string, index int) (err error) {
dir := r.SnapshotDir(generation)
fis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
for _, fi := range fis {
idx, _, err := ParseSnapshotPath(fi.Name())
if err != nil {
continue
} else if idx >= index {
continue
}
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err
}
}
return nil
}
// deleteGenerationWALBefore deletes WAL files before a given index.
func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation string, index int) (err error) {
dir := r.WALDir(generation)
fis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
for _, fi := range fis {
idx, _, _, err := ParseWALPath(fi.Name())
if err != nil {
continue
} else if idx >= index {
continue
}
log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err
}
}
return nil
}
// SnapshotIndexAt returns the highest index for a snapshot within a generation
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func SnapshotIndexAt(ctx context.Context, r Replica, generation string, timestamp time.Time) (int, error) {
snapshots, err := r.Snapshots(ctx)
if err != nil {
return 0, err
} else if len(snapshots) == 0 {
return 0, ErrNoSnapshots
}
index := -1
var max time.Time
for _, snapshot := range snapshots {
if !timestamp.IsZero() && snapshot.CreatedAt.After(timestamp) {
continue // after timestamp, skip
}
// Use snapshot if it newer.
if max.IsZero() || snapshot.CreatedAt.After(max) {
index, max = snapshot.Index, snapshot.CreatedAt
}
}
if index == -1 {
return 0, ErrNoSnapshots
}
return index, nil
}
// WALIndexAt returns the highest index for a WAL file that occurs before maxIndex & timestamp.
// If timestamp is zero, returns the highest WAL index.
func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int, timestamp time.Time) (int, error) {
wals, err := r.WALs(ctx)
if err != nil {
return 0, err
}
var index int
for _, wal := range wals {
if wal.Generation != generation {
continue
}
if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) {
continue // after timestamp, skip
} else if wal.Index > maxIndex {
continue // after max index, skip
} else if wal.Index < index {
continue // earlier index, skip
}
index = wal.Index
}
// If max index is specified but not found, return an error.
if maxIndex != math.MaxInt64 && index != maxIndex {
return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index)
}
return index, nil
}
// compressFile compresses a file and replaces it with a new file with a .lz4 extension.
func compressFile(src, dst string, uid, gid int) error {
r, err := os.Open(src)
if err != nil {
return err
}
defer r.Close()
w, err := os.Create(dst + ".tmp")
fi, err := r.Stat()
if err != nil {
return err
}
w, err := createFile(dst+".tmp", fi.Mode(), uid, gid)
if err != nil {
return err
}
defer w.Close()
gz := gzip.NewWriter(w)
defer gz.Close()
zr := lz4.NewWriter(w)
defer zr.Close()
// Copy & compress file contents to temporary file.
if _, err := io.Copy(gz, r); err != nil {
if _, err := io.Copy(zr, r); err != nil {
return err
} else if err := gz.Close(); err != nil {
} else if err := zr.Close(); err != nil {
return err
} else if err := w.Sync(); err != nil {
return err
@@ -844,8 +986,108 @@ func compressFile(src, dst string) error {
return os.Rename(dst+".tmp", dst)
}
// walDirMask is a mask used to group 64K wal files into a directory.
const (
walDirFileN = 0x10000
walDirMask = uint64(0xFFFFFFFFFFFFFFFF ^ (walDirFileN - 1))
)
// ValidateReplica restores the most recent data from a replica and validates
// that the resulting database matches the current database.
func ValidateReplica(ctx context.Context, r Replica) error {
db := r.DB()
// Compute checksum of primary database under lock. This prevents a
// sync from occurring and the database will not be written.
chksum0, pos, err := db.CRC64()
if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err)
}
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos)
// Wait until replica catches up to position.
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
if err := waitForReplica(ctx, r, pos); err != nil {
return fmt.Errorf("cannot wait for replica: %w", err)
}
// Restore replica to a temporary directory.
tmpdir, err := ioutil.TempDir("", "*-litestream")
if err != nil {
return err
}
defer os.RemoveAll(tmpdir)
restorePath := filepath.Join(tmpdir, "db")
if err := RestoreReplica(ctx, r, RestoreOptions{
OutputPath: restorePath,
ReplicaName: r.Name(),
Generation: pos.Generation,
Index: pos.Index - 1,
Logger: log.New(os.Stderr, "", 0),
}); err != nil {
return fmt.Errorf("cannot restore: %w", err)
}
// Open file handle for restored database.
chksum1, err := checksumFile(restorePath)
if err != nil {
return err
}
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1)
// Validate checksums match.
if chksum0 != chksum1 {
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
return ErrChecksumMismatch
}
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
log.Printf("%s(%s): replica ok", db.Path(), r.Name())
return nil
}
// waitForReplica blocks until replica reaches at least the given position.
func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
db := r.DB()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
once := make(chan struct{}, 1)
once <- struct{}{}
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case <-once: // immediate on first check
}
// Obtain current position of replica, check if past target position.
curr, err := r.CalcPos(ctx, pos.Generation)
if err != nil {
log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err)
continue
}
// Exit if the generation has changed while waiting as there will be
// no further progress on the old generation.
if curr.Generation != pos.Generation {
return fmt.Errorf("generation changed")
}
ready := true
if curr.Index < pos.Index {
ready = false
} else if curr.Index == pos.Index && curr.Offset < pos.Offset {
ready = false
}
// If not ready, restart loop.
if !ready {
log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos)
continue
}
// Current position at or after target position.
return nil
}
}

90
replica_test.go Normal file
View File

@@ -0,0 +1,90 @@
package litestream_test
import (
"context"
"testing"
"github.com/benbjohnson/litestream"
)
func TestFileReplica_Sync(t *testing.T) {
// Ensure replica can successfully sync after DB has sync'd.
t.Run("InitialSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
// Sync database & then sync replica.
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if err := r.Sync(context.Background()); err != nil {
t.Fatal(err)
}
// Ensure posistions match.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := r.LastPos(), pos; got != want {
t.Fatalf("LastPos()=%v, want %v", got, want)
}
})
// Ensure replica can successfully sync multiple times.
t.Run("MultiSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Write to the database multiple times and sync after each write.
for i, n := 0, db.MinCheckpointPageN*2; i < n; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz')`); err != nil {
t.Fatal(err)
}
// Sync periodically.
if i%100 == 0 || i == n-1 {
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if err := r.Sync(context.Background()); err != nil {
t.Fatal(err)
}
}
}
// Ensure posistions match.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 2; got != want {
t.Fatalf("Index=%v, want %v", got, want)
} else if calcPos, err := r.CalcPos(context.Background(), pos.Generation); err != nil {
t.Fatal(err)
} else if got, want := calcPos, pos; got != want {
t.Fatalf("CalcPos()=%v, want %v", got, want)
} else if got, want := r.LastPos(), pos; got != want {
t.Fatalf("LastPos()=%v, want %v", got, want)
}
})
// Ensure replica returns an error if there is no generation available from the DB.
t.Run("ErrNoGeneration", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
if err := r.Sync(context.Background()); err == nil || err.Error() != `no generation, waiting for data` {
t.Fatal(err)
}
})
}
// NewTestFileReplica returns a new replica using a temp directory & with monitoring disabled.
func NewTestFileReplica(tb testing.TB, db *litestream.DB) *litestream.FileReplica {
r := litestream.NewFileReplica(db, "", tb.TempDir())
r.MonitorEnabled = false
db.Replicas = []litestream.Replica{r}
return r
}

1044
s3/s3.go Normal file

File diff suppressed because it is too large Load Diff