Compare commits

..

86 Commits

Author SHA1 Message Date
Ben Johnson
a178ef4714 Merge pull request #27 from benbjohnson/generations-replica-url
Allow replica URLs for generations command
2021-01-27 07:50:29 -07:00
Ben Johnson
7ca2e193b9 Allow replica URLs for generations command 2021-01-27 07:48:56 -07:00
Ben Johnson
39a6fabb9f Fix restore logging. 2021-01-26 17:01:00 -07:00
Ben Johnson
0249b4e4f5 Merge pull request #25 from benbjohnson/replica-url 2021-01-26 16:40:17 -07:00
Ben Johnson
67eeb49101 Allow replica URL to be used for commands
This commit refactors the commands to allow a replica URL when
restoring a database. If the first CLI arg is a URL with a scheme,
the it is treated as a replica URL.
2021-01-26 16:33:16 -07:00
Ben Johnson
f7213ed35c Allow replication without config file.
This commit changes `litestream replicate` to accept a database
path and a replica URL instead of using the config file. This allows
people to quickly try out the tool instead of learning the config
file syntax.
2021-01-25 10:33:50 -07:00
Ben Johnson
a532a0198e README 2021-01-24 10:09:54 -07:00
Ben Johnson
16f79e5814 Merge pull request #24 from benbjohnson/document-retention-period
Document retention period configuration
2021-01-24 09:32:31 -07:00
Ben Johnson
39aefc2c02 Document retention period configuration 2021-01-24 09:28:57 -07:00
Ben Johnson
0b08669bca Merge pull request #23 from benbjohnson/disable-metrics-by-default
Disable prometheus metrics by default
2021-01-24 09:18:37 -07:00
Ben Johnson
8f5761ee13 Disable prometheus metrics by default
The HTTP server should only be enabled if a user explicitly sets a
port for it.
2021-01-24 09:16:23 -07:00
Ben Johnson
d2eb4fa5ba Remove PR action 2021-01-24 08:54:29 -07:00
Ben Johnson
ca489c5e73 Merge pull request #22 from benbjohnson/notorize
Add signed homebrew install
2021-01-24 08:50:01 -07:00
Ben Johnson
f0ae48af4c Add signed homebrew install 2021-01-24 08:47:16 -07:00
Ben Johnson
9eae39e2fa README 2021-01-21 15:01:30 -07:00
Ben Johnson
42ab293ffb README 2021-01-21 14:53:21 -07:00
Ben Johnson
c8b72bf16b Fix release action 2021-01-21 14:39:23 -07:00
Ben Johnson
9c4de6c520 Debian release action 2021-01-21 14:34:42 -07:00
Ben Johnson
94411923a7 Fix unit test 2021-01-21 13:52:35 -07:00
Ben Johnson
e92db9ef4b Enforce stricter validation on restart.
Previously, the sync would validate the last page written to ensure
that replication picked up from the last position. However, a large
WAL file followed by a series of shorter checkpointed WAL files means
that the last page could be the same even if multiple checkpoints
have occurred.

To fix this, the WAL header must match the shadow WAL header when
starting litestream since there are no guarantees about checkpoints.
2021-01-21 13:44:05 -07:00
Ben Johnson
031a526b9a Only copy committed WAL pages 2021-01-21 12:44:11 -07:00
Ben Johnson
2244be885d README 2021-01-20 17:05:47 -07:00
Ben Johnson
95bcaa5927 Fix file replica compression 2021-01-19 09:25:38 -07:00
Ben Johnson
1935ebd6f0 Fix S3 GET bytes metric 2021-01-19 06:46:13 -07:00
Ben Johnson
7fb98df240 cleanup 2021-01-18 15:58:49 -07:00
Ben Johnson
f31c22af62 Remove s3 bucket lookup log 2021-01-18 15:27:16 -07:00
Ben Johnson
139d836d7a Fix file/dir mode 2021-01-18 15:23:28 -07:00
Ben Johnson
14dad1fd5a Switch from gzip to lz4 2021-01-18 14:45:12 -07:00
Ben Johnson
35d755e7f2 Remove debugging code 2021-01-18 10:33:30 -07:00
Ben Johnson
358dcd4650 Copy shadow WAL immediately after init 2021-01-18 10:01:16 -07:00
Ben Johnson
2ce4052300 Remove write lock during db checksum 2021-01-18 07:05:27 -07:00
Ben Johnson
44af75fa98 Fix missing WAL reader error 2021-01-18 07:05:17 -07:00
Ben Johnson
3c4fd152c9 Add more checksum logging 2021-01-18 06:38:03 -07:00
Ben Johnson
d259d9b9e3 Fix checksum logging 2021-01-17 10:19:39 -07:00
Ben Johnson
90a1d959d4 Remove size from s3 filenames 2021-01-17 10:02:06 -07:00
Ben Johnson
04d75507e3 Fix checksum hex padding 2021-01-17 09:52:09 -07:00
Ben Johnson
4b65e6a88f Log validation position 2021-01-17 07:38:13 -07:00
Ben Johnson
07a65cbac7 Fix crc64 unit test 2021-01-16 10:04:03 -07:00
Ben Johnson
6ac6a8536d Obtain write lock during validation. 2021-01-16 09:27:43 -07:00
Ben Johnson
71ab15e50a Fix S3 GET stats 2021-01-16 09:22:33 -07:00
Ben Johnson
b4e5079760 Add .deb packaging 2021-01-16 09:22:02 -07:00
Ben Johnson
78563f821d Do not require databases when starting replication 2021-01-16 09:15:16 -07:00
Ben Johnson
e65536f81d Stop waiting for replica if generation changes 2021-01-16 07:47:02 -07:00
Ben Johnson
25fec29e1a Clear last position on replica sync error 2021-01-16 07:45:08 -07:00
Ben Johnson
cbc2dce6dc Add busy timeout 2021-01-16 07:33:32 -07:00
Ben Johnson
1b8cfc8a41 Add validation interval 2021-01-15 16:37:04 -07:00
Ben Johnson
290e06e60d Reduce s3 LIST operations 2021-01-15 13:31:04 -07:00
Ben Johnson
b94ee366e5 Fix snapshot only restore 2021-01-15 13:12:15 -07:00
Ben Johnson
743aeb83e1 Revert gzip compression level, fix s3 wal upload 2021-01-15 13:04:21 -07:00
Ben Johnson
a7ec05ad7a Allow global AWS settings in config. 2021-01-15 12:27:41 -07:00
Ben Johnson
28dd7b564e Lookup s3 bucket region if not specified 2021-01-15 12:18:07 -07:00
Ben Johnson
43dda4315f Allow URLs for replica config path 2021-01-15 12:04:23 -07:00
Ben Johnson
0655bf420a Fix unit tests 2021-01-14 16:13:19 -07:00
Ben Johnson
8c113cf260 Add file & s3 replica metrics 2021-01-14 16:10:02 -07:00
Ben Johnson
daa74f87b4 Add file replica metrics 2021-01-14 15:47:58 -07:00
Ben Johnson
e1c9e09161 Update wal segment naming 2021-01-14 15:26:29 -07:00
Ben Johnson
1e4e9633cc Add s3 sync interval 2021-01-14 15:04:26 -07:00
Ben Johnson
294846cce2 Add context to s3 2021-01-13 16:38:00 -07:00
Ben Johnson
9eb7bd41c2 S3 reader & retention enforcement 2021-01-13 16:21:42 -07:00
Ben Johnson
1ac4adb272 Basic s3 replication working 2021-01-13 14:23:41 -07:00
Ben Johnson
a42f83f3cb Add LITESTREAM_CONFIG env var 2021-01-13 13:17:38 -07:00
Ben Johnson
57a02a8628 S3 replica 2021-01-13 10:14:54 -07:00
Ben Johnson
faa5765745 Add retention policy, remove WAL subdir 2021-01-12 15:22:37 -07:00
Ben Johnson
1fa1313b0b Add trace logging. 2021-01-11 11:04:29 -07:00
Ben Johnson
bcdb553267 Use database owner/group 2021-01-11 09:39:08 -07:00
Ben Johnson
9828b4c1dd Rename 'databases' to 'dbs' in config 2021-01-10 10:07:07 -07:00
Ben Johnson
dde9d1042d Update generation lag calc 2021-01-10 09:54:05 -07:00
Ben Johnson
8f30ff7d93 Fix negative duration truncation. 2021-01-10 09:52:04 -07:00
Ben Johnson
aa136a17ee Fix duration truncation. 2021-01-10 09:46:39 -07:00
Ben Johnson
60cb2c97ca Set default max checkpoint. 2021-01-10 09:46:27 -07:00
Ben Johnson
0abe09526d Fix local build 2021-01-09 08:59:08 -07:00
Ben Johnson
b0a3440356 make dist 2021-01-08 16:05:07 -07:00
Ben Johnson
a8d63b54aa README 2021-01-08 10:07:26 -07:00
Ben Johnson
b22f3f100d Add FileReplica.Sync() unit tests. 2021-01-05 15:48:50 -07:00
Ben Johnson
3075b2e92b Fix WAL mod time test 2021-01-05 15:15:28 -07:00
Ben Johnson
7c3272c96f Revert "Test 1.16beta1"
This reverts commit 4294fcf4b4.
2021-01-05 15:11:29 -07:00
Ben Johnson
4294fcf4b4 Test 1.16beta1 2021-01-05 15:10:09 -07:00
Ben Johnson
ae0f51eaa9 Fix setup-go 2021-01-05 14:34:14 -07:00
Ben Johnson
8871d75a8e Fix max checkpoint size check 2021-01-05 14:17:13 -07:00
Ben Johnson
c22eea13ad Add checkpoint tests 2021-01-05 14:07:17 -07:00
Ben Johnson
f4d055916a Add DB sync tests 2021-01-05 13:59:16 -07:00
Ben Johnson
979cabcdb9 Add some DB.Sync() tests 2021-01-01 10:02:03 -07:00
Ben Johnson
5134bc3328 Add test coverage for DB.CRC64 2021-01-01 09:26:23 -07:00
Ben Johnson
78d9de6512 Add DB path tests 2021-01-01 09:00:23 -07:00
Ben Johnson
065f641526 Change validation to use CRC-64 2021-01-01 08:24:11 -07:00
Ben Johnson
f4d0d87fa7 Add DB.UpdatedAt() tests 2021-01-01 08:20:40 -07:00
32 changed files with 3778 additions and 1065 deletions

View File

@@ -5,7 +5,7 @@ on:
name: release
jobs:
release:
linux:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
@@ -16,10 +16,21 @@ jobs:
env:
GITHUB_TOKEN: ${{ github.token }}
- name: Install nfpm
run: |
wget https://github.com/goreleaser/nfpm/releases/download/v2.2.3/nfpm_2.2.3_Linux_x86_64.tar.gz
tar zxvf nfpm_2.2.3_Linux_x86_64.tar.gz
- name: Build litestream
run: |
go build -ldflags "-X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o litestream ./cmd/litestream
mkdir -p dist
cp etc/litestream.yml etc/litestream.service dist
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml
go build -ldflags "-X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o dist/litestream ./cmd/litestream
cd dist
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz litestream
../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
- name: Upload release binary
uses: actions/upload-release-asset@v1.0.2
@@ -27,6 +38,16 @@ jobs:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_name: litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_content_type: application/gzip
- name: Upload debian package
uses: actions/upload-release-asset@v1.0.2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
asset_name: litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
asset_content_type: application/octet-stream

View File

@@ -1,10 +1,12 @@
on: [push, pull_request]
on: push
name: test
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
- uses: actions/checkout@v2

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.DS_Store
/dist

22
Makefile Normal file
View File

@@ -0,0 +1,22 @@
default:
dist-linux:
mkdir -p dist
cp etc/litestream.yml dist/litestream.yml
docker run --rm -v "${PWD}":/usr/src/litestream -w /usr/src/litestream -e GOOS=linux -e GOARCH=amd64 golang:1.15 go build -v -o dist/litestream ./cmd/litestream
tar -cz -f dist/litestream-linux-amd64.tar.gz -C dist litestream
dist-macos:
ifndef LITESTREAM_VERSION
$(error LITESTREAM_VERSION is undefined)
endif
mkdir -p dist
go build -v -ldflags "-X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
gon etc/gon.hcl
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
clean:
rm -rf dist
.PHONY: default dist-linux dist-macos clean

297
README.md
View File

@@ -1,17 +1,296 @@
litestream
Litestream
![GitHub release (latest by date)](https://img.shields.io/github/v/release/benbjohnson/litestream)
![Status](https://img.shields.io/badge/status-beta-blue)
![GitHub](https://img.shields.io/github/license/benbjohnson/litestream)
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
==========
Streaming replication for SQLite.
Litestream is a standalone streaming replication tool for SQLite. It runs as a
background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database.
If you need support or have ideas for improving Litestream, please visit the
[GitHub Discussions](https://github.com/benbjohnson/litestream/discussions) to
chat.
If you find this project interesting, please consider starring the project on
GitHub.
## Questions
## Installation
- How to avoid WAL checkpointing on close?
### Mac OS (Homebrew)
To install from homebrew, first add the Litestream tap and then install:
## Notes
```sql
-- Disable autocheckpointing.
PRAGMA wal_autocheckpoint = 0
```sh
$ brew install benbjohnson/litestream/litestream
```
### Linux (Debian)
You can download the `.deb` file from the [Releases page][releases] page and
then run the following:
```sh
$ sudo dpkg -i litestream-v0.3.0-linux-amd64.deb
```
Once installed, you'll need to enable & start the service:
```sh
$ sudo systemctl enable litestream
$ sudo systemctl start litestream
```
### Release binaries
You can also download the release binary for your system from the
[releases page][releases] and run it as a standalone application.
### Building from source
Download and install the [Go toolchain](https://golang.org/) and then run:
```sh
$ go install ./cmd/litestream
```
The `litestream` binary should be in your `$GOPATH/bin` folder.
## Quick Start
Litestream provides a configuration file that can be used for production
deployments but you can also specify a single database and replica on the
command line for testing.
First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this
example. You'll also need to set your AWS credentials:
```sh
$ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
```
Next you can run the `litestream replicate` command with the path to the
database you want to backup and the URL of your replica destination:
```sh
$ litestream replicate /path/to/db s3://mybkt/db
```
If you make changes to your local database, those changes will be replicated
to S3 every 10 seconds. From another terminal window, you can restore your
database from your S3 replica:
```
$ litestream restore -v -o /path/to/restored/db s3://mybkt/db
```
Voila! 🎉
Your database should be restored to the last replicated state that
was sent to S3. You can adjust your replication frequency and other options by
using a configuration-based approach specified below.
## Configuration
A configuration-based install gives you more replication options. By default,
the config file lives at `/etc/litestream.yml` but you can pass in a different
path to any `litestream` command using the `-config PATH` flag. You can also
set the `LITESTREAM_CONFIG` environment variable to specify a new path.
The configuration specifies one or more `dbs` and a list of one or more replica
locations for each db. Below are some common configurations:
### Replicate to S3
This will replicate the database at `/path/to/db` to the `"/db"` path inside
the S3 bucket named `"mybkt"`.
```yaml
access-key-id: AKIAxxxxxxxxxxxxxxxx
secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
dbs:
- path: /path/to/db
replicas:
- url: s3://mybkt/db
```
### Replicate to another file path
This will replicate the database at `/path/to/db` to a directory named
`/path/to/replica`.
```yaml
dbs:
- path: /path/to/db
replicas:
- path: /path/to/replica
```
### Retention period
By default, replicas will retain a snapshot & subsequent WAL changes for 24
hours. When the snapshot age exceeds the retention threshold, a new snapshot
is taken and uploaded and the previous snapshot and WAL files are removed.
You can configure this setting per-replica. Times are parsed using [Go's
duration](https://golang.org/pkg/time/#ParseDuration) so time units of hours
(`h`), minutes (`m`), and seconds (`s`) are allowed but days, weeks, months, and
years are not.
```yaml
db:
- path: /path/to/db
replicas:
- url: s3://mybkt/db
retention: 1h # 1 hour retention
```
### Monitoring replication
You can also enable a Prometheus metrics endpoint to monitor replication by
specifying a bind address with the `addr` field:
```yml
addr: ":9090"
```
This will make metrics available at: http://localhost:9090/metrics
### Other configuration options
These are some additional configuration options available on replicas:
- `type`—Specify the type of replica (`"file"` or `"s3"`). Derived from `"path"`.
- `name`—Specify an optional name for the replica if you are using multiple replicas.
- `path`—File path to the replica location.
- `url`—URL to the replica location.
- `retention-check-interval`—Time between retention enforcement checks. Defaults to `1h`.
- `validation-interval`—Interval between periodic checks to ensure restored backup matches current database. Disabled by default.
These replica options are only available for S3 replicas:
- `bucket`—S3 bucket name. Derived from `"path"`.
- `region`—S3 bucket region. Looked up on startup if unspecified.
- `sync-interval`—Replication sync frequency.
## Usage
### Replication
Once your configuration is saved, you'll need to begin replication. If you
installed the `.deb` file then run:
```sh
$ sudo systemctl restart litestream
```
To run litestream on its own, run:
```sh
# Replicate using the /etc/litestream.yml configuration.
$ litestream replicate
# Replicate using a different configuration path.
$ litestream replicate -config /path/to/litestream.yml
```
The `litestream` command will initialize and then wait indefinitely for changes.
You should see your destination replica path is now populated with a
`generations` directory. Inside there should be a 16-character hex generation
directory and inside there should be snapshots & WAL files. As you make changes
to your source database, changes will be copied over to your replica incrementally.
### Restoring a backup
Litestream can restore a previous snapshot and replay all replicated WAL files.
By default, it will restore up to the latest WAL file but you can also perform
point-in-time restores.
A database can only be restored to a path that does not exist so you don't need
to worry about accidentally overwriting your current database.
```sh
# Restore database to original path.
$ litestream restore /path/to/db
# Restore database to a new location.
$ litestream restore -o /path/to/restored/db /path/to/db
# Restore from a replica URL.
$ litestream restore -o /path/to/restored/db s3://mybkt/db
# Restore database to a specific point-in-time.
$ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db
```
Point-in-time restores only have the resolution of the timestamp of the WAL file
itself. By default, litestream will start a new WAL file every minute so
point-in-time restores are only accurate to the minute.
## How it works
SQLite provides a WAL (write-ahead log) journaling mode which writes pages to
a `-wal` file before eventually being copied over to the original database file.
This copying process is known as checkpointing. The WAL file works as a circular
buffer so when the WAL reaches a certain size then it restarts from the beginning.
Litestream works by taking over the checkpointing process and controlling when
it is restarted to ensure that it copies every new page. Checkpointing is only
allowed when there are no read transactions so Litestream maintains a
long-running read transaction against each database until it is ready to
checkpoint.
The SQLite WAL file is copied to a separate location called the shadow WAL which
ensures that it will not be overwritten by SQLite. This shadow WAL acts as a
temporary buffer so that replicas can replicate to their destination (e.g.
another file path or to S3). The shadow WAL files are removed once they have
been fully replicated. You can find the shadow directory as a hidden directory
next to your database file. If you database file is named `/var/lib/my.db` then
the shadow directory will be `/var/lib/.my.db-litestream`.
Litestream groups a snapshot and all subsequent WAL changes into "generations".
A generation is started on initial replication of a database and a new
generation will be started if litestream detects that the WAL replication is
no longer contiguous. This can occur if the `litestream` process is stopped and
another process is allowed to checkpoint the WAL.
## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), litestream is open
source but closed to contributions. This keeps the code base free of proprietary
or licensed code but it also helps me continue to maintain and build litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and
I eventually archived the project. Writing databases & low-level replication
tools involves nuance and simple one line changes can have profound and
unexpected changes in correctness and performance. Small contributions
typically required hours of my time to properly test and validate them.
I am grateful for community involvement, bug reports, & feature requests. I do
not wish to come off as anything but welcoming, however, I've
made the decision to keep this project closed to contributions for my own
mental health and long term viability of the project.
[releases]: https://github.com/benbjohnson/litestream/releases

View File

@@ -10,8 +10,10 @@ import (
"text/tabwriter"
)
// DatabasesCommand is a command for listing managed databases.
type DatabasesCommand struct{}
// Run executes the command.
func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-databases", flag.ContinueOnError)
@@ -36,7 +38,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "path\treplicas")
for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(dbConfig)
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
@@ -56,6 +58,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *DatabasesCommand) Usage() {
fmt.Printf(`
The databases command lists all databases in the configuration file.
@@ -71,6 +74,6 @@ Arguments:
Defaults to %s
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -7,13 +7,16 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// GenerationsCommand represents a command to list all generations for a database.
type GenerationsCommand struct{}
// Run executes the command.
func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError)
@@ -23,50 +26,60 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
var db *litestream.DB
var r litestream.Replica
updatedAt := time.Now()
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
// Determine last time database or WAL was updated.
if updatedAt, err = db.UpdatedAt(); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB from from configuration.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Determine last time database or WAL was updated.
updatedAt, err := db.UpdatedAt()
if err != nil {
return err
var replicas []litestream.Replica
if r != nil {
replicas = []litestream.Replica{r}
} else {
replicas = db.Replicas
}
// List each generation.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range db.Replicas {
if *replicaName != "" && r.Name() != *replicaName {
continue
}
for _, r := range replicas {
generations, err := r.Generations(ctx)
if err != nil {
log.Printf("%s: cannot list generations: %s", r.Name(), err)
@@ -84,10 +97,11 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
r.Name(),
generation,
truncateDuration(stats.UpdatedAt.Sub(updatedAt)).String(),
truncateDuration(updatedAt.Sub(stats.UpdatedAt)).String(),
stats.CreatedAt.Format(time.RFC3339),
stats.UpdatedAt.Format(time.RFC3339),
)
w.Flush()
}
}
w.Flush()
@@ -95,35 +109,51 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
return nil
}
// Usage prints the help message to STDOUT.
func (c *GenerationsCommand) Usage() {
fmt.Printf(`
The generations command lists all generations for a database. It also lists
stats about their lag behind the primary database and the time range they cover.
The generations command lists all generations for a database or replica. It also
lists stats about their lag behind the primary database and the time range they
cover.
Usage:
litestream generations [arguments] DB
litestream generations [arguments] DB_PATH
litestream generations [arguments] REPLICA_URL
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
-replica NAME
Optional, filters by replica.
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}
func truncateDuration(d time.Duration) time.Duration {
if d > time.Hour {
return d.Truncate(time.Hour)
} else if d > time.Minute {
return d.Truncate(time.Minute)
} else if d > time.Second {
if d < 0 {
if d < -10*time.Second {
return d.Truncate(time.Second)
} else if d < -time.Second {
return d.Truncate(time.Second / 10)
} else if d < -time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d < -time.Microsecond {
return d.Truncate(time.Microsecond)
}
return d
}
if d > 10*time.Second {
return d.Truncate(time.Second)
} else if d > time.Second {
return d.Truncate(time.Second / 10)
} else if d > time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d > time.Microsecond {

View File

@@ -6,12 +6,17 @@ import (
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"os/user"
"path"
"path/filepath"
"strings"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
_ "github.com/mattn/go-sqlite3"
"gopkg.in/yaml.v2"
)
@@ -20,9 +25,6 @@ var (
Version = "(development build)"
)
// DefaultConfigPath is the default configuration path.
const DefaultConfigPath = "/etc/litestream.yml"
func main() {
log.SetFlags(0)
@@ -35,12 +37,15 @@ func main() {
}
}
// Main represents the main program execution.
type Main struct{}
// NewMain returns a new instance of Main.
func NewMain() *Main {
return &Main{}
}
// Run executes the program.
func (m *Main) Run(ctx context.Context, args []string) (err error) {
var cmd string
if len(args) > 0 {
@@ -58,8 +63,6 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
return (&RestoreCommand{}).Run(ctx, args)
case "snapshots":
return (&SnapshotsCommand{}).Run(ctx, args)
case "validate":
return (&ValidateCommand{}).Run(ctx, args)
case "version":
return (&VersionCommand{}).Run(ctx, args)
case "wal":
@@ -73,6 +76,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
}
}
// Usage prints the help screen to STDOUT.
func (m *Main) Usage() {
fmt.Println(`
litestream is a tool for replicating SQLite databases.
@@ -83,37 +87,37 @@ Usage:
The commands are:
databases list databases specified in config file
generations list available generations for a database
replicate runs a server to replicate databases
restore recovers database backup from a replica
snapshots list available snapshots for a database
validate checks replica to ensure a consistent state with primary
version prints the version
version prints the binary version
wal list available WAL files for a database
`[1:])
}
// Default configuration settings.
const (
DefaultAddr = ":9090"
)
// Config represents a configuration file for the litestream daemon.
type Config struct {
// Bind address for serving metrics.
Addr string `yaml:"addr"`
// List of databases to manage.
DBs []*DBConfig `yaml:"databases"`
DBs []*DBConfig `yaml:"dbs"`
// Global S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
}
// DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config {
return Config{
Addr: DefaultAddr,
}
return Config{}
}
// DBConfig returns database configuration by path.
func (c *Config) DBConfig(path string) *DBConfig {
for _, dbConfig := range c.DBs {
if dbConfig.Path == path {
@@ -124,18 +128,13 @@ func (c *Config) DBConfig(path string) *DBConfig {
}
// ReadConfigFile unmarshals config from filename. Expands path if needed.
func ReadConfigFile(filename string) (Config, error) {
func ReadConfigFile(filename string) (_ Config, err error) {
config := DefaultConfig()
// Expand filename, if necessary.
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) {
u, err := user.Current()
if err != nil {
return config, err
} else if u.HomeDir == "" {
return config, fmt.Errorf("home directory unset")
}
filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix))
filename, err = expand(filename)
if err != nil {
return config, err
}
// Read & deserialize configuration.
@@ -146,32 +145,123 @@ func ReadConfigFile(filename string) (Config, error) {
} else if err := yaml.Unmarshal(buf, &config); err != nil {
return config, err
}
// Normalize paths.
for _, dbConfig := range config.DBs {
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
return config, err
}
}
return config, nil
}
// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
Replicas []*ReplicaConfig `yaml:"replicas"`
}
// ReplicaConfig represents the configuration for a single replica in a database.
type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"` // used for file replicas
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"`
URL string `yaml:"url"`
Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
ValidationInterval time.Duration `yaml:"validation-interval"`
// S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
}
// NewReplicaFromURL returns a new Replica instance configured from a URL.
// The replica's database is not set.
func NewReplicaFromURL(s string) (litestream.Replica, error) {
scheme, host, path, err := ParseReplicaURL(s)
if err != nil {
return nil, err
}
switch scheme {
case "file":
return litestream.NewFileReplica(nil, "", path), nil
case "s3":
r := s3.NewReplica(nil, "")
r.Bucket, r.Path = host, path
return r, nil
default:
return nil, fmt.Errorf("invalid replica url type: %s", s)
}
}
// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
}
switch u.Scheme {
case "file":
scheme, u.Scheme = u.Scheme, ""
return scheme, "", path.Clean(u.String()), nil
case "":
return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
default:
return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
}
}
// isURL returns true if s can be parsed and has a scheme.
func isURL(s string) bool {
u, err := url.Parse(s)
return err == nil && u.Scheme != ""
}
// ReplicaType returns the type based on the type field or extracted from the URL.
func (c *ReplicaConfig) ReplicaType() string {
typ, _, _, _ := ParseReplicaURL(c.URL)
if typ != "" {
return typ
} else if c.Type != "" {
return c.Type
}
return "file"
}
// DefaultConfigPath returns the default config path.
func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
return v
}
return "/etc/litestream.yml"
}
func registerConfigFlag(fs *flag.FlagSet, p *string) {
fs.StringVar(p, "config", DefaultConfigPath, "config path")
fs.StringVar(p, "config", DefaultConfigPath(), "config path")
}
// newDBFromConfig instantiates a DB based on a configuration.
func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
path, err := expand(dbc.Path)
if err != nil {
return nil, err
}
// Initialize database with given path.
db := litestream.NewDB(config.Path)
db := litestream.NewDB(path)
// Instantiate and attach replicas.
for _, rconfig := range config.Replicas {
r, err := newReplicaFromConfig(db, rconfig)
for _, rc := range dbc.Replicas {
r, err := newReplicaFromConfig(db, c, dbc, rc)
if err != nil {
return nil, err
}
@@ -182,19 +272,129 @@ func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
}
// newReplicaFromConfig instantiates a replica for a DB based on a config.
func newReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (litestream.Replica, error) {
switch config.Type {
case "", "file":
return newFileReplicaFromConfig(db, config)
func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) {
// Ensure user did not specify URL in path.
if isURL(rc.Path) {
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path)
}
switch rc.ReplicaType() {
case "file":
return newFileReplicaFromConfig(db, c, dbc, rc)
case "s3":
return newS3ReplicaFromConfig(db, c, dbc, rc)
default:
return nil, fmt.Errorf("unknown replica type in config: %q", config.Type)
return nil, fmt.Errorf("unknown replica type in config: %q", rc.Type)
}
}
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplica, error) {
if config.Path == "" {
return nil, fmt.Errorf("file replica path require for db %q", db.Path())
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) {
path := rc.Path
if rc.URL != "" {
_, _, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
return litestream.NewFileReplica(db, config.Name, config.Path), nil
if path == "" {
return nil, fmt.Errorf("%s: file replica path required", db.Path())
}
if path, err = expand(path); err != nil {
return nil, err
}
r := litestream.NewFileReplica(db, rc.Name, path)
if v := rc.Retention; v > 0 {
r.Retention = v
}
if v := rc.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v
}
if v := rc.ValidationInterval; v > 0 {
r.ValidationInterval = v
}
return r, nil
}
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) {
bucket := c.Bucket
if v := rc.Bucket; v != "" {
bucket = v
}
path := rc.Path
if rc.URL != "" {
_, bucket, path, err = ParseReplicaURL(rc.URL)
if err != nil {
return nil, err
}
}
// Use global or replica-specific S3 settings.
accessKeyID := c.AccessKeyID
if v := rc.AccessKeyID; v != "" {
accessKeyID = v
}
secretAccessKey := c.SecretAccessKey
if v := rc.SecretAccessKey; v != "" {
secretAccessKey = v
}
region := c.Region
if v := rc.Region; v != "" {
region = v
}
// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
}
// Build replica.
r := s3.NewReplica(db, rc.Name)
r.AccessKeyID = accessKeyID
r.SecretAccessKey = secretAccessKey
r.Region = region
r.Bucket = bucket
r.Path = path
if v := rc.Retention; v > 0 {
r.Retention = v
}
if v := rc.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v
}
if v := rc.SyncInterval; v > 0 {
r.SyncInterval = v
}
if v := rc.ValidationInterval; v > 0 {
r.ValidationInterval = v
}
return r, nil
}
// expand returns an absolute path for s.
func expand(s string) (string, error) {
// Just expand to absolute path if there is no home directory prefix.
prefix := "~" + string(os.PathSeparator)
if s != "~" && !strings.HasPrefix(s, prefix) {
return filepath.Abs(s)
}
// Look up home directory.
u, err := user.Current()
if err != nil {
return "", err
} else if u.HomeDir == "" {
return "", fmt.Errorf("cannot expand path %s, no home directory available", s)
}
// Return path with tilde replaced by the home directory.
if s == "~" {
return u.HomeDir, nil
}
return filepath.Join(u.HomeDir, strings.TrimPrefix(s, prefix)), nil
}

View File

@@ -5,15 +5,19 @@ import (
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// ReplicateCommand represents a command that continuously replicates SQLite databases.
type ReplicateCommand struct {
ConfigPath string
Config Config
@@ -25,19 +29,35 @@ type ReplicateCommand struct {
// Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
verbose := fs.Bool("v", false, "verbose logging")
registerConfigFlag(fs, &c.ConfigPath)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
}
// Load configuration.
if c.ConfigPath == "" {
return errors.New("-config required")
// Load configuration or use CLI args to build db/replica.
var config Config
if fs.NArg() == 1 {
return fmt.Errorf("must specify at least one replica URL for %s", fs.Arg(0))
} else if fs.NArg() > 1 {
dbConfig := &DBConfig{Path: fs.Arg(0)}
for _, u := range fs.Args()[1:] {
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{URL: u})
}
config.DBs = []*DBConfig{dbConfig}
} else if c.ConfigPath != "" {
config, err = ReadConfigFile(c.ConfigPath)
if err != nil {
return err
}
} else {
return errors.New("-config flag or database/replica arguments required")
}
config, err := ReadConfigFile(c.ConfigPath)
if err != nil {
return err
// Enable trace logging.
if *verbose {
litestream.Tracef = log.Printf
}
// Setup signal handler.
@@ -50,11 +70,11 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
fmt.Printf("litestream %s\n", Version)
if len(config.DBs) == 0 {
return errors.New("configuration must specify at least one database")
fmt.Println("no databases specified in configuration")
}
for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(dbConfig)
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return err
}
@@ -67,15 +87,29 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
}
// Notify user that initialization is done.
fmt.Printf("Initialized with %d databases.\n", len(c.DBs))
for _, db := range c.DBs {
fmt.Printf("initialized db: %s\n", db.Path())
for _, r := range db.Replicas {
switch r := r.(type) {
case *litestream.FileReplica:
fmt.Printf("replicating to: name=%q type=%q path=%q\n", r.Name(), r.Type(), r.Path())
case *s3.Replica:
fmt.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q\n", r.Name(), r.Type(), r.Bucket, r.Path, r.Region)
default:
fmt.Printf("replicating to: name=%q type=%q\n", r.Name(), r.Type())
}
}
}
// Serve metrics over HTTP if enabled.
if config.Addr != "" {
_, port, _ := net.SplitHostPort(config.Addr)
fmt.Printf("Serving metrics on http://localhost:%s/metrics\n", port)
fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port)
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(config.Addr, nil)
if err := http.ListenAndServe(config.Addr, nil); err != nil {
log.Printf("cannot start metrics server: %s", err)
}
}()
}
@@ -105,19 +139,28 @@ func (c *ReplicateCommand) Close() (err error) {
return err
}
// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
The replicate command starts a server to monitor & replicate databases
specified in your configuration file.
The replicate command starts a server to monitor & replicate databases.
You can specify your database & replicas in a configuration file or you can
replicate a single database file by specifying its path and its replicas in the
command line arguments.
Usage:
litestream replicate [arguments]
litestream replicate [arguments] DB_PATH REPLICA_URL [REPLICA_URL...]
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
`[1:], DefaultConfigPath)
-v
Enable verbose logging output.
`[1:], DefaultConfigPath())
}

View File

@@ -7,15 +7,15 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/benbjohnson/litestream"
)
type RestoreCommand struct {
}
// RestoreCommand represents a command to restore a database from a backup.
type RestoreCommand struct{}
// Run executes the command.
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
opt := litestream.NewRestoreOptions()
@@ -32,20 +32,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Parse timestamp, if specified.
if *timestampStr != "" {
if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
@@ -63,32 +54,84 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
// Determine replica & generation to restore from.
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = c.loadFromURL(ctx, fs.Arg(0), &opt); err != nil {
return err
}
} else if configPath != "" {
if r, err = c.loadFromConfig(ctx, fs.Arg(0), configPath, &opt); err != nil {
return err
}
} else {
return errors.New("config path or replica URL required")
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
// Return an error if no matching targets found.
if opt.Generation == "" {
return fmt.Errorf("no matching backups found")
}
return db.Restore(ctx, opt)
return litestream.RestoreReplica(ctx, r, opt)
}
// loadFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
r, err := NewReplicaFromURL(replicaURL)
if err != nil {
return nil, err
}
opt.Generation, _, err = litestream.CalcReplicaRestoreTarget(ctx, r, *opt)
return r, err
}
// loadFromConfig returns a replica & updates the restore options from a DB reference.
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return nil, err
}
// Lookup database from configuration file by path.
if dbPath, err = expand(dbPath); err != nil {
return nil, err
}
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return nil, fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(&config, dbConfig)
if err != nil {
return nil, err
}
// Restore into original database path if not specified.
if opt.OutputPath == "" {
opt.OutputPath = dbPath
}
// Determine the appropriate replica & generation to restore from,
r, generation, err := db.CalcRestoreTarget(ctx, *opt)
if err != nil {
return nil, err
}
opt.Generation = generation
return r, nil
}
// Usage prints the help screen to STDOUT.
func (c *RestoreCommand) Usage() {
fmt.Printf(`
The restore command recovers a database from a previous snapshot and WAL.
Usage:
litestream restore [arguments] DB
litestream restore [arguments] DB_PATH
litestream restore [arguments] REPLICA_URL
Arguments:
@@ -142,6 +185,6 @@ Examples:
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -6,15 +6,16 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// SnapshotsCommand represents a command to list snapshots for a command.
type SnapshotsCommand struct{}
// Run executes the command.
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError)
@@ -29,37 +30,42 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
} else {
return errors.New("config path or replica URL required")
}
// Find snapshots by db or replica.
var infos []*litestream.SnapshotInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.Snapshots(ctx); err != nil {
if r != nil {
if infos, err = r.Snapshots(ctx); err != nil {
return err
}
} else {
@@ -85,13 +91,16 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *SnapshotsCommand) Usage() {
fmt.Printf(`
The snapshots command lists all snapshots available for a database.
The snapshots command lists all snapshots available for a database or replica.
Usage:
litestream snapshots [arguments] DB
litestream snapshots [arguments] DB_PATH
litestream snapshots [arguments] REPLICA_URL
Arguments:
@@ -102,7 +111,6 @@ Arguments:
-replica NAME
Optional, filter by a specific replica.
Examples:
# List all snapshots for a database.
@@ -111,7 +119,10 @@ Examples:
# List all snapshots on S3.
$ litestream snapshots -replica s3 /path/to/db
# List all snapshots by replica URL.
$ litestream snapshots s3://mybkt/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -1,136 +0,0 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"github.com/benbjohnson/litestream"
)
type ValidateCommand struct{}
func (c *ValidateCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
opt := litestream.NewRestoreOptions()
fs := flag.NewFlagSet("litestream-validate", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run")
verbose := fs.Bool("v", false, "verbose output")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Verbose output is automatically enabled if dry run is specified.
if opt.DryRun {
*verbose = true
}
// Instantiate logger if verbose output is enabled.
if *verbose {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Ensure replica exists, if specified.
if opt.ReplicaName != "" && db.Replica(opt.ReplicaName) == nil {
return fmt.Errorf("replica not found: %s", opt.ReplicaName)
}
// Validate all matching replicas.
var hasInvalidReplica bool
for _, r := range db.Replicas {
if opt.ReplicaName != "" && opt.ReplicaName != r.Name() {
continue
}
if err := db.Validate(ctx, r.Name(), opt); err != nil {
fmt.Printf("%s: replica invalid: %s\n", r.Name(), err)
}
}
if hasInvalidReplica {
return fmt.Errorf("one or more invalid replicas found")
}
fmt.Println("ok")
return nil
}
func (c *ValidateCommand) Usage() {
fmt.Printf(`
The validate command compares a checksum of the primary database with a
checksum of the replica at the same point in time. Returns an error if the
databases are not equal.
The restored database must be written to a temporary file so you must ensure
you have enough disk space before performing this operation.
Usage:
litestream validate [arguments] DB
Arguments:
-config PATH
Specifies the configuration file.
Defaults to %s
-replica NAME
Validate a specific replica.
Defaults to validating all replicas.
-dry-run
Prints all log output as if it were running but does
not perform actual validation.
-v
Verbose output.
Examples:
# Validate all replicas for the given database.
$ litestream validate /path/to/db
# Validate only the S3 replica.
$ litestream restore -replica s3 /path/to/db
`[1:],
DefaultConfigPath,
)
}

View File

@@ -6,8 +6,10 @@ import (
"fmt"
)
// VersionCommand represents a command to print the current version.
type VersionCommand struct{}
// Run executes the command.
func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-version", flag.ContinueOnError)
fs.Usage = c.Usage
@@ -20,6 +22,7 @@ func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *VersionCommand) Usage() {
fmt.Println(`
Prints the version.

View File

@@ -6,15 +6,16 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// WALCommand represents a command to list WAL files for a database.
type WALCommand struct{}
// Run executes the command.
func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError)
@@ -30,37 +31,42 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
return err
}
} else if configPath != "" {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
return err
}
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
} else {
return errors.New("config path or replica URL required")
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
// Find WAL files by db or replica.
var infos []*litestream.WALInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.WALs(ctx); err != nil {
if r != nil {
if infos, err = r.WALs(ctx); err != nil {
return err
}
} else {
@@ -91,13 +97,16 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *WALCommand) Usage() {
fmt.Printf(`
The wal command lists all wal files available for a database.
Usage:
litestream wal [arguments] DB
litestream wal [arguments] DB_PATH
litestream wal [arguments] REPLICA_URL
Arguments:
@@ -111,16 +120,18 @@ Arguments:
-generation NAME
Optional, filter by a specific generation.
Examples:
# List all WAL files for a database.
$ litestream wal /path/to/db
# List all WAL files on S3 for a specific generation.
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db
$ litestream wal -replica s3 -generation xxxxxxxx /path/to/db
# List all WAL files for replica URL.
$ litestream wal s3://mybkt/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

678
db.go

File diff suppressed because it is too large Load Diff

647
db_test.go Normal file
View File

@@ -0,0 +1,647 @@
package litestream_test
import (
"database/sql"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/benbjohnson/litestream"
)
func TestDB_Path(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.Path(), `/tmp/db`; got != want {
t.Fatalf("Path()=%v, want %v", got, want)
}
}
func TestDB_WALPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.WALPath(), `/tmp/db-wal`; got != want {
t.Fatalf("WALPath()=%v, want %v", got, want)
}
}
func TestDB_MetaPath(t *testing.T) {
t.Run("Absolute", func(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.MetaPath(), `/tmp/.db-litestream`; got != want {
t.Fatalf("MetaPath()=%v, want %v", got, want)
}
})
t.Run("Relative", func(t *testing.T) {
db := litestream.NewDB("db")
if got, want := db.MetaPath(), `.db-litestream`; got != want {
t.Fatalf("MetaPath()=%v, want %v", got, want)
}
})
}
func TestDB_GenerationNamePath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.GenerationNamePath(), `/tmp/.db-litestream/generation`; got != want {
t.Fatalf("GenerationNamePath()=%v, want %v", got, want)
}
}
func TestDB_GenerationPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.GenerationPath("xxxx"), `/tmp/.db-litestream/generations/xxxx`; got != want {
t.Fatalf("GenerationPath()=%v, want %v", got, want)
}
}
func TestDB_ShadowWALDir(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.ShadowWALDir("xxxx"), `/tmp/.db-litestream/generations/xxxx/wal`; got != want {
t.Fatalf("ShadowWALDir()=%v, want %v", got, want)
}
}
func TestDB_ShadowWALPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.ShadowWALPath("xxxx", 1000), `/tmp/.db-litestream/generations/xxxx/wal/000003e8.wal`; got != want {
t.Fatalf("ShadowWALPath()=%v, want %v", got, want)
}
}
// Ensure we can check the last modified time of the real database and its WAL.
func TestDB_UpdatedAt(t *testing.T) {
t.Run("ErrNotExist", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if _, err := db.UpdatedAt(); !os.IsNotExist(err) {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("DB", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if t0, err := db.UpdatedAt(); err != nil {
t.Fatal(err)
} else if time.Since(t0) > 10*time.Second {
t.Fatalf("unexpected updated at time: %s", t0)
}
})
t.Run("WAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
t0, err := db.UpdatedAt()
if err != nil {
t.Fatal(err)
}
if os.Getenv("CI") != "" {
time.Sleep(1 * time.Second)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
t.Fatal(err)
}
if t1, err := db.UpdatedAt(); err != nil {
t.Fatal(err)
} else if !t1.After(t0) {
t.Fatalf("expected newer updated at time: %s > %s", t1, t0)
}
})
}
// Ensure we can compute a checksum on the real database.
func TestDB_CRC64(t *testing.T) {
t.Run("ErrNotExist", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if _, _, err := db.CRC64(); !os.IsNotExist(err) {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("DB", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
chksum0, _, err := db.CRC64()
if err != nil {
t.Fatal(err)
}
// Issue change that is applied to the WAL. Checksum should not change.
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
t.Fatal(err)
} else if chksum1, _, err := db.CRC64(); err != nil {
t.Fatal(err)
} else if chksum0 == chksum1 {
t.Fatal("expected different checksum event after WAL change")
}
// Checkpoint change into database. Checksum should change.
if _, err := sqldb.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
t.Fatal(err)
}
if chksum2, _, err := db.CRC64(); err != nil {
t.Fatal(err)
} else if chksum0 == chksum2 {
t.Fatal("expected different checksums after checkpoint")
}
})
}
// Ensure we can sync the real WAL to the shadow WAL.
func TestDB_Sync(t *testing.T) {
// Ensure sync is skipped if no database exists.
t.Run("NoDB", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
})
// Ensure sync can successfully run on the initial sync.
t.Run("Initial", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify page size if now available.
if db.PageSize() == 0 {
t.Fatal("expected page size after initial sync")
}
// Obtain real WAL size.
fi, err := os.Stat(db.WALPath())
if err != nil {
t.Fatal(err)
}
// Ensure position now available.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos.Generation == "" {
t.Fatal("expected generation")
} else if got, want := pos.Index, 0; got != want {
t.Fatalf("pos.Index=%v, want %v", got, want)
} else if got, want := pos.Offset, fi.Size(); got != want {
t.Fatalf("pos.Offset=%v, want %v", got, want)
}
})
// Ensure DB can keep in sync across multiple Sync() invocations.
t.Run("MultiSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Perform initial sync & grab initial position.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Insert into table.
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
// Sync to ensure position moves forward one page.
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation != pos1.Generation {
t.Fatal("expected the same generation")
} else if got, want := pos1.Index, pos0.Index; got != want {
t.Fatalf("Index=%v, want %v", got, want)
} else if got, want := pos1.Offset, pos0.Offset+4096+litestream.WALFrameHeaderSize; got != want {
t.Fatalf("Offset=%v, want %v", got, want)
}
})
// Ensure a WAL file is created if one does not already exist.
t.Run("NoWAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Issue initial sync and truncate WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Checkpoint & fully close which should close WAL file.
if err := db.Checkpoint(litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
}
// Verify WAL does not exist.
if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
t.Fatal(err)
}
// Reopen the managed database.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
// Re-sync and ensure new generation has been created.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation after truncation")
}
})
// Ensure DB can start new generation if it detects it cannot verify last position.
t.Run("OverwritePrevPosition", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Issue initial sync and truncate WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Fully close which should close WAL file.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
}
// Verify WAL does not exist.
if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
t.Fatal(err)
}
// Insert into table multiple times to move past old offset
sqldb = MustOpenSQLDB(t, db.Path())
defer MustCloseSQLDB(t, sqldb)
for i := 0; i < 100; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
}
// Reopen the managed database.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
// Re-sync and ensure new generation has been created.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation after truncation")
}
})
// Ensure DB can handle a mismatched header-only and start new generation.
t.Run("WALHeaderMismatch", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Grab initial position & close.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
t.Fatal(err)
}
// Read existing file, update header checksum, and write back only header
// to simulate a header with a mismatched checksum.
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := ioutil.ReadFile(shadowWALPath); err != nil {
t.Fatal(err)
} else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify a new generation was started.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
}
})
// Ensure DB can handle partial shadow WAL header write.
t.Run("PartialShadowWALHeader", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Close & truncate shadow WAL to simulate a partial header write.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), litestream.WALHeaderSize-1); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify a new generation was started.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
}
})
// Ensure DB can handle partial shadow WAL writes.
t.Run("PartialShadowWALFrame", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Obtain current shadow WAL size.
fi, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index))
if err != nil {
t.Fatal(err)
}
// Close & truncate shadow WAL to simulate a partial frame write.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), fi.Size()-1); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify same generation is kept.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos1, pos0; got != want {
t.Fatalf("Pos()=%s want %s", got, want)
}
// Ensure shadow WAL has recovered.
if fi0, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
t.Fatal(err)
} else if got, want := fi0.Size(), fi.Size(); got != want {
t.Fatalf("Size()=%v, want %v", got, want)
}
})
// Ensure DB can handle a generation directory with a missing shadow WAL.
t.Run("NoShadowWAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Close & delete shadow WAL to simulate dir created but not WAL.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Remove(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify new generation created but index/offset the same.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
} else if got, want := pos1.Index, pos0.Index; got != want {
t.Fatalf("Index=%v want %v", got, want)
} else if got, want := pos1.Offset, pos0.Offset; got != want {
t.Fatalf("Offset=%v want %v", got, want)
}
})
// Ensure DB checkpoints after minimum number of pages.
t.Run("MinCheckpointPageN", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Write at least minimum number of pages to trigger rollover.
for i := 0; i < db.MinCheckpointPageN; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
}
// Sync to shadow WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
// Ensure DB checkpoints after interval.
t.Run("CheckpointInterval", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Reduce checkpoint interval to ensure a rollover is triggered.
db.CheckpointInterval = 1 * time.Nanosecond
// Write to WAL & sync.
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
}
// MustOpenDBs returns a new instance of a DB & associated SQL DB.
func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) {
db := MustOpenDB(tb)
return db, MustOpenSQLDB(tb, db.Path())
}
// MustCloseDBs closes db & sqldb and removes the parent directory.
func MustCloseDBs(tb testing.TB, db *litestream.DB, sqldb *sql.DB) {
MustCloseDB(tb, db)
MustCloseSQLDB(tb, sqldb)
}
// MustOpenDB returns a new instance of a DB.
func MustOpenDB(tb testing.TB) *litestream.DB {
dir := tb.TempDir()
return MustOpenDBAt(tb, filepath.Join(dir, "db"))
}
// MustOpenDBAt returns a new instance of a DB for a given path.
func MustOpenDBAt(tb testing.TB, path string) *litestream.DB {
tb.Helper()
db := litestream.NewDB(path)
db.MonitorInterval = 0 // disable background goroutine
if err := db.Open(); err != nil {
tb.Fatal(err)
}
return db
}
// MustCloseDB closes db and removes its parent directory.
func MustCloseDB(tb testing.TB, db *litestream.DB) {
tb.Helper()
if err := db.Close(); err != nil {
tb.Fatal(err)
} else if err := os.RemoveAll(filepath.Dir(db.Path())); err != nil {
tb.Fatal(err)
}
}
// MustOpenSQLDB returns a database/sql DB.
func MustOpenSQLDB(tb testing.TB, path string) *sql.DB {
tb.Helper()
d, err := sql.Open("sqlite3", path)
if err != nil {
tb.Fatal(err)
} else if _, err := d.Exec(`PRAGMA journal_mode = wal;`); err != nil {
tb.Fatal(err)
}
return d
}
// MustCloseSQLDB closes a database/sql DB.
func MustCloseSQLDB(tb testing.TB, d *sql.DB) {
tb.Helper()
if err := d.Close(); err != nil {
tb.Fatal(err)
}
}

View File

@@ -1,88 +0,0 @@
DESIGN
======
Litestream is a sidecar process that replicates the write ahead log (WAL) for
a SQLite database. To ensure that it can replicate every page, litestream takes
control over the checkpointing process by issuing a long running read
transaction against the database to prevent checkpointing. It then releases
this transaction once it obtains a write lock and issues the checkpoint itself.
The daemon polls the database on an interval to breifly obtain a write
transaction lock and copy over new WAL pages. Once the WAL has reached a
threshold size, litestream will issue a checkpoint and a single page write
to a table called `_litestream` to start the new WAL.
## Workflow
When litestream first loads a database, it checks if there is an existing
sidecar directory which is named `.<DB>-litestream`. If not, it initializes
the directory and starts a new generation.
A generation is a snapshot of the database followed by a continuous stream of
WAL files. A new generation is started on initialization & whenever litestream
cannot verify that it has a continuous record of WAL files. This could happen
if litestream is stopped and another process checkpoints the WAL. In this case,
a new generation ID is randomly created and a snapshot is replicated to the
appropriate destinations.
Generations also prevent two servers from replicating to the same destination
and corrupting each other's data. In this case, each server would replicate
to a different generation directory. On recovery, there will be duplicate
databases and the end user can choose which generation to recover but each
database will be uncorrupted.
## File Layout
Litestream maintains a shadow WAL which is a historical record of all previous
WAL files. These files can be deleted after a time or size threshold but should
be replicated before being deleted.
### Local
Given a database file named `db`, SQLite will create a WAL file called `db-wal`.
Litestream will then create a hidden directory called `.db-litestream` that
contains the historical record of all WAL files for the current generation.
```
db # SQLite database
db-wal # SQLite WAL
.db-litestream/
generation # current generation number
generations/
xxxxxxxx/
wal/ # WAL files
000000000000001.wal
000000000000002.wal
000000000000003.wal # active WAL
```
### Remote (S3)
```
bkt/
db/ # database path
generations/
xxxxxxxx/
snapshots/ # snapshots w/ timestamp+offset
20000101T000000Z-000000000000023.snapshot
wal/ # compressed WAL files
000000000000001-0.wal.gz
000000000000001-<offset>.wal.gz
000000000000002-0.wal.gz
00000002/
snapshot/
000000000000000.snapshot
scheduled/
daily/
20000101T000000Z-000000000000023.snapshot
20000102T000000Z-000000000000036.snapshot
monthly/
20000101T000000Z-000000000000023.snapshot
wal/
000000000000001.wal.gz
```

15
etc/gon.hcl Normal file
View File

@@ -0,0 +1,15 @@
source = ["./dist/litestream"]
bundle_id = "com.middlemost.litestream"
apple_id {
username = "benbjohnson@yahoo.com"
password = "@env:AC_PASSWORD"
}
sign {
application_identity = "Developer ID Application: Middlemost Systems, LLC"
}
zip {
output_path = "dist/litestream.zip"
}

9
etc/litestream.service Normal file
View File

@@ -0,0 +1,9 @@
[Unit]
Description=Litestream
[Service]
Restart=always
ExecStart=/usr/bin/litestream replicate
[Install]
WantedBy=multi-user.target

10
etc/litestream.yml Normal file
View File

@@ -0,0 +1,10 @@
# AWS credentials
# access-key-id: AKIAxxxxxxxxxxxxxxxx
# secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
# dbs:
# - path: /path/to/primary/db # Database to replicate from
# replicas:
# - path: /path/to/replica # File-based replication
# - path: s3://my.bucket.com/db # S3-based replication

19
etc/nfpm.yml Normal file
View File

@@ -0,0 +1,19 @@
name: litestream
arch: amd64
platform: linux
version: "${LITESTREAM_VERSION}"
section: "default"
priority: "extra"
maintainer: "Ben Johnson <benbjohnson@yahoo.com>"
description: Litestream is a tool for real-time replication of SQLite databases.
homepage: "https://github.com/benbjohnson/litestream"
license: "GPLv3"
contents:
- src: ./litestream
dst: /usr/bin/litestream
- src: ./litestream.yml
dst: /etc/litestream.yml
type: config
- src: ./litestream.service
dst: /usr/lib/systemd/system/litestream.service
type: config

3
go.mod
View File

@@ -3,7 +3,10 @@ module github.com/benbjohnson/litestream
go 1.15
require (
github.com/aws/aws-sdk-go v1.27.0
github.com/davecgh/go-spew v1.1.1
github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3
github.com/prometheus/client_golang v1.9.0
gopkg.in/yaml.v2 v2.4.0
)

7
go.sum
View File

@@ -18,6 +18,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0 h1:0xphMHGMLBrPMfxR2AmVjZKcMEESEgWF8Kru94BNByk=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -40,6 +41,7 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@@ -124,6 +126,7 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -194,7 +197,11 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4/v4 v4.1.3 h1:/dvQpkb0o1pVlSgKNQqfkavlnXaIK+hJ0LXsKRUN9D4=
github.com/pierrec/lz4/v4 v4.1.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

32
internal/internal.go Normal file
View File

@@ -0,0 +1,32 @@
package internal
import (
"io"
)
// ReadCloser wraps a reader to also attach a separate closer.
type ReadCloser struct {
r io.Reader
c io.Closer
}
// NewReadCloser returns a new instance of ReadCloser.
func NewReadCloser(r io.Reader, c io.Closer) *ReadCloser {
return &ReadCloser{r, c}
}
// Read reads bytes into the underlying reader.
func (r *ReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
// Close closes the reader (if implementing io.ReadCloser) and the Closer.
func (r *ReadCloser) Close() error {
if rc, ok := r.r.(io.Closer); ok {
if err := rc.Close(); err != nil {
r.c.Close()
return err
}
}
return r.c.Close()
}

44
internal/metrics.go Normal file
View File

@@ -0,0 +1,44 @@
package internal
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// Shared replica metrics.
var (
ReplicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "snapshot_total",
Help: "The current number of snapshots",
}, []string{"db", "name"})
ReplicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_bytes",
Help: "The number wal bytes written",
}, []string{"db", "name"})
ReplicaWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_index",
Help: "The current WAL index",
}, []string{"db", "name"})
ReplicaWALOffsetGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_offset",
Help: "The current WAL offset",
}, []string{"db", "name"})
ReplicaValidationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "validation_total",
Help: "The number of validations performed",
}, []string{"db", "name", "status"})
)

View File

@@ -1,10 +1,8 @@
package litestream
import (
"compress/gzip"
"database/sql"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
@@ -13,11 +11,11 @@ import (
"regexp"
"strconv"
"strings"
"syscall"
"time"
_ "github.com/mattn/go-sqlite3"
)
// Naming constants.
const (
MetaDirSuffix = "-litestream"
@@ -52,6 +50,30 @@ type SnapshotInfo struct {
CreatedAt time.Time
}
// FilterSnapshotsAfter returns all snapshots that were created on or after t.
func FilterSnapshotsAfter(a []*SnapshotInfo, t time.Time) []*SnapshotInfo {
other := make([]*SnapshotInfo, 0, len(a))
for _, snapshot := range a {
if !snapshot.CreatedAt.Before(t) {
other = append(other, snapshot)
}
}
return other
}
// FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
func FindMinSnapshotByGeneration(a []*SnapshotInfo, generation string) *SnapshotInfo {
var min *SnapshotInfo
for _, snapshot := range a {
if snapshot.Generation != generation {
continue
} else if min == nil || snapshot.Index < min.Index {
min = snapshot
}
}
return min
}
// WALInfo represents file information about a WAL file.
type WALInfo struct {
Name string
@@ -75,7 +97,7 @@ func (p Pos) String() string {
if p.IsZero() {
return "<>"
}
return fmt.Sprintf("<%s,%d,%d>", p.Generation, p.Index, p.Offset)
return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset)
}
// IsZero returns true if p is the zero value.
@@ -129,10 +151,6 @@ func readWALHeader(filename string) ([]byte, error) {
return buf[:n], err
}
func readCheckpointSeqNo(hdr []byte) uint32 {
return binary.BigEndian.Uint32(hdr[12:])
}
// readFileAt reads a slice from a file.
func readFileAt(filename string, offset, n int64) ([]byte, error) {
f, err := os.Open(filename)
@@ -184,19 +202,19 @@ func IsSnapshotPath(s string) bool {
// ParseSnapshotPath returns the index for the snapshot.
// Returns an error if the path is not a valid snapshot path.
func ParseSnapshotPath(s string) (index int, typ, ext string, err error) {
func ParseSnapshotPath(s string) (index int, ext string, err error) {
s = filepath.Base(s)
a := snapshotPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s)
return 0, "", fmt.Errorf("invalid snapshot path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), a[2], a[3], nil
return int(i64), a[2], nil
}
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:-(\w+))?(.snapshot(?:.gz)?)$`)
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.lz4)?)$`)
// IsWALPath returns true if s is a path to a WAL file.
func IsWALPath(s string) bool {
@@ -221,75 +239,82 @@ func ParseWALPath(s string) (index int, offset int64, ext string, err error) {
// FormatWALPath formats a WAL filename with a given index.
func FormatWALPath(index int) string {
assert(index >= 0, "wal index must be non-negative")
return fmt.Sprintf("%016x%s", index, WALExt)
return fmt.Sprintf("%08x%s", index, WALExt)
}
// FormatWALPathWithOffset formats a WAL filename with a given index & offset.
func FormatWALPathWithOffset(index int, offset int64) string {
assert(index >= 0, "wal index must be non-negative")
assert(offset >= 0, "wal offset must be non-negative")
return fmt.Sprintf("%016x_%016x%s", index, offset, WALExt)
return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt)
}
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`)
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.lz4)?)$`)
// isHexChar returns true if ch is a lowercase hex character.
func isHexChar(ch rune) bool {
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
}
// gzipReadCloser wraps gzip.Reader to also close the underlying reader on close.
type gzipReadCloser struct {
r *gzip.Reader
closer io.ReadCloser
// createFile creates the file and attempts to set the UID/GID.
func createFile(filename string, perm os.FileMode, uid, gid int) (*os.File, error) {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return nil, err
}
_ = f.Chown(uid, gid)
return f, nil
}
func (r *gzipReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
// mkdirAll is a copy of os.MkdirAll() except that it attempts to set the
// uid/gid for each created directory.
func mkdirAll(path string, perm os.FileMode, uid, gid int) error {
// Fast path: if we can tell whether path is a directory or file, stop with success or error.
dir, err := os.Stat(path)
if err == nil {
if dir.IsDir() {
return nil
}
return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
}
func (r *gzipReadCloser) Close() error {
if err := r.r.Close(); err != nil {
r.closer.Close()
// Slow path: make sure parent exists and then call Mkdir for path.
i := len(path)
for i > 0 && os.IsPathSeparator(path[i-1]) { // Skip trailing path separator.
i--
}
j := i
for j > 0 && !os.IsPathSeparator(path[j-1]) { // Scan backward over element.
j--
}
if j > 1 {
// Create parent.
err = mkdirAll(fixRootDirectory(path[:j-1]), perm, uid, gid)
if err != nil {
return err
}
}
// Parent now exists; invoke Mkdir and use its result.
err = os.Mkdir(path, perm)
if err != nil {
// Handle arguments like "foo/." by
// double-checking that directory doesn't exist.
dir, err1 := os.Lstat(path)
if err1 == nil && dir.IsDir() {
_ = os.Chown(path, uid, gid)
return nil
}
return err
}
return r.closer.Close()
_ = os.Chown(path, uid, gid)
return nil
}
// HexDump returns hexdump output but with duplicate lines removed.
func HexDump(b []byte) string {
const prefixN = len("00000000")
var output []string
var prev string
var ellipsis bool
lines := strings.Split(strings.TrimSpace(hex.Dump(b)), "\n")
for i, line := range lines {
// Add line to output if it is not repeating or the last line.
if i == 0 || i == len(lines)-1 || trimPrefixN(line, prefixN) != trimPrefixN(prev, prefixN) {
output = append(output, line)
prev, ellipsis = line, false
continue
}
// Add an ellipsis for the first duplicate line.
if !ellipsis {
output = append(output, "...")
ellipsis = true
continue
}
}
return strings.Join(output, "\n")
}
func trimPrefixN(s string, n int) string {
if len(s) < n {
return ""
}
return s[n:]
}
// Tracef is used for low-level tracing.
var Tracef = func(format string, a ...interface{}) {}
func assert(condition bool, message string) {
if !condition {

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/benbjohnson/litestream"
_ "github.com/mattn/go-sqlite3"
)
func TestChecksum(t *testing.T) {

18
litestream_unix.go Normal file
View File

@@ -0,0 +1,18 @@
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris
package litestream
import (
"os"
"syscall"
)
// fileinfo returns syscall fields from a FileInfo object.
func fileinfo(fi os.FileInfo) (uid, gid int) {
stat := fi.Sys().(*syscall.Stat_t)
return int(stat.Uid), int(stat.Gid)
}
func fixRootDirectory(p string) string {
return p
}

23
litestream_windows.go Normal file
View File

@@ -0,0 +1,23 @@
// +build windows
package litestream
import (
"os"
"syscall"
)
// fileinfo returns syscall fields from a FileInfo object.
func fileinfo(fi os.FileInfo) (uid, gid int) {
return -1, -1
}
// fixRootDirectory is copied from the standard library for use with mkdirAll()
func fixRootDirectory(p string) string {
if len(p) == len(`\\?\c:`) {
if IsPathSeparator(p[0]) && IsPathSeparator(p[1]) && p[2] == '?' && IsPathSeparator(p[3]) && p[5] == ':' {
return p + `\`
}
}
return p
}

File diff suppressed because it is too large Load Diff

90
replica_test.go Normal file
View File

@@ -0,0 +1,90 @@
package litestream_test
import (
"context"
"testing"
"github.com/benbjohnson/litestream"
)
func TestFileReplica_Sync(t *testing.T) {
// Ensure replica can successfully sync after DB has sync'd.
t.Run("InitialSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
// Sync database & then sync replica.
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if err := r.Sync(context.Background()); err != nil {
t.Fatal(err)
}
// Ensure posistions match.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := r.LastPos(), pos; got != want {
t.Fatalf("LastPos()=%v, want %v", got, want)
}
})
// Ensure replica can successfully sync multiple times.
t.Run("MultiSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Write to the database multiple times and sync after each write.
for i, n := 0, db.MinCheckpointPageN*2; i < n; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz')`); err != nil {
t.Fatal(err)
}
// Sync periodically.
if i%100 == 0 || i == n-1 {
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if err := r.Sync(context.Background()); err != nil {
t.Fatal(err)
}
}
}
// Ensure posistions match.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 2; got != want {
t.Fatalf("Index=%v, want %v", got, want)
} else if calcPos, err := r.CalcPos(context.Background(), pos.Generation); err != nil {
t.Fatal(err)
} else if got, want := calcPos, pos; got != want {
t.Fatalf("CalcPos()=%v, want %v", got, want)
} else if got, want := r.LastPos(), pos; got != want {
t.Fatalf("LastPos()=%v, want %v", got, want)
}
})
// Ensure replica returns an error if there is no generation available from the DB.
t.Run("ErrNoGeneration", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
if err := r.Sync(context.Background()); err == nil || err.Error() != `no generation, waiting for data` {
t.Fatal(err)
}
})
}
// NewTestFileReplica returns a new replica using a temp directory & with monitoring disabled.
func NewTestFileReplica(tb testing.TB, db *litestream.DB) *litestream.FileReplica {
r := litestream.NewFileReplica(db, "", tb.TempDir())
r.MonitorEnabled = false
db.Replicas = []litestream.Replica{r}
return r
}

1044
s3/s3.go Normal file

File diff suppressed because it is too large Load Diff