Compare commits

..

101 Commits

Author SHA1 Message Date
Tyler Davis
91e57a6156 fix: remove deprecated ioutil and rand calls
- Change ioutil to io and os calls per Go 1.16 deprecation
- Move global random to localized random source
2023-03-11 19:16:12 -07:00
Tyler Davis
6bbced3d46 fix: update go versions in mod and docker
- Update Go module to v1.19 format
- Docker builder pinned to Go v1.20.1
- Alpine image pinned to 3.17.2 (rather than `latest`)
2023-03-11 19:16:12 -07:00
Erik Kristensen
8dbdf2b91c fix: remove debug code 2023-01-17 18:47:29 -07:00
Erik Kristensen
530930465d fix: aws credential chain by using aws.Config 2023-01-17 18:47:29 -07:00
Lincoln Stoll
64e535e23b Handle errors when deleting objects from S3
I recently noticed that the cost for ListBucket calls was increasing for an
application that was using Litestream. After investigating it seemed that the
bucket had retained the entire history of data, while Litestream was
continually logging that it was deleting the same data:

```
2022-10-30T12:00:27Z (s3): wal segmented deleted before 0792d3393bf79ced/00000233: n=1428
<snip>
2022-10-30T13:00:24Z (s3): wal segmented deleted before 0792d3393bf79ced/00000233: n=1428
```

This is occuring because the DeleteObjects call is a batch item, that returns
the individual object deletion errors in the response[1]. The S3 replica client
discards the response, and only handles errors in the original API call. I had
a misconfigured IAM policy that meant all deletes were failing, but this never
actually bubbled up as a real error.

To fix this, I added a check for the response body to handle any errors the
operation might have encountered. Because this may include a large number of
errors (in this case 1428 of them), the output is summarized to avoid an overly
large error message. When items are not found, they will not return an error[2]
- they will still be marked as deleted, so this change should be in-line with
the original intentions of this code.

1: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Example_2
2: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
2022-11-03 10:02:42 -06:00
Jose Diaz-Gonzalez
f50f03d8fc Update readme to note that this tool is for disaster recovery, not streaming replication
Refs #411
2022-10-14 15:12:58 -06:00
Ben Johnson
868d564988 Remove streaming replication implementation 2022-08-08 16:34:17 -06:00
Ben Johnson
a8ab14cca2 Update dependencies 2022-08-08 15:24:46 -06:00
Ryan Russell
80cd049ae7 Revert to correct wal_downloader.go
Signed-off-by: Ryan Russell <git@ryanrussell.org>
2022-06-23 08:42:37 -06:00
Ryan Russell
2acdab02c8 Improve readability
Signed-off-by: Ryan Russell <ryanrussell@users.noreply.github.com>
2022-06-23 08:42:37 -06:00
Yasuhiro Matsumoto
31aa5b34f6 Fix build tag 2022-06-07 15:09:52 -06:00
Yasuhiro Matsumoto
4522c7bce5 implement Fileinfo for Windows and non-Windows 2022-06-07 15:09:52 -06:00
Ben Johnson
e9dbf83a45 Re-add Fileinfo() 2022-06-07 15:09:52 -06:00
Yasuhiro Matsumoto
7d0167f10a Unwatch directory 2022-06-07 15:09:52 -06:00
Yasuhiro Matsumoto
2c0dce21fa Use fsnotify 2022-06-07 15:09:52 -06:00
Hiroaki Nakamura
98673c6785 Add environment variables for scheme and forcePathStyle 2022-05-16 15:12:39 -06:00
Hiroaki Nakamura
46597ab22f Fix wal internal error log 2022-05-13 15:25:43 -06:00
Hiroaki Nakamura
e6f7c6052d Add two environments for overriding endpoint and region
export LITESTREAM_ACCESS_KEY_ID=your_key_id
export LITESTREAM_SECRET_ACCESS_KEY=your_access_key
export LITESTREAM_ENDPOINT=your_endpoint
export LITESTREAM_REGION=your_region
litestream replicate fruits.db s3://mybkt/fruits.db
2022-05-10 08:48:48 -06:00
Ben Johnson
7d8b8c6ec0 Remove verbose flag from restore docs 2022-05-03 06:33:53 -07:00
Michael Lynch
88737d7164 Add a unit test for internal.MD5Hash 2022-04-17 15:02:37 -06:00
Michael Lynch
6763e9218c Fix path to coverage file 2022-04-17 15:02:32 -06:00
Michael Lynch
301e1172fd Add Go code coverage to CI 2022-04-17 15:02:32 -06:00
Ben Johnson
ca07137d32 Re-add point-in-time restore 2022-04-14 20:03:52 -06:00
Ben Johnson
80f8de4d9e Fix release workflow 2022-04-09 10:34:26 -06:00
Ben Johnson
5d394bbc57 Document -addr flag on replicate command 2022-04-05 13:48:15 -06:00
Ben Johnson
f53857e1ad Add minimum shadow WAL retention 2022-04-04 21:25:31 -06:00
Ben Johnson
44662022fa Allow read replication recovery from last position 2022-04-04 20:19:02 -06:00
Ben Johnson
2c3e28c786 Improve http error logging 2022-04-03 11:55:42 -06:00
Ben Johnson
46888530b2 Default upstream path if not specified 2022-04-03 09:15:54 -06:00
Ben Johnson
6aba416656 Remove CI task for executing long running test runner on each build 2022-04-02 11:53:22 -06:00
Ben Johnson
8d10881278 Use database page size in read replication 2022-04-02 11:50:30 -06:00
Ben Johnson
00bad4308d Set permission on file replica client on init 2022-03-06 08:38:07 -07:00
Ben Johnson
d5792c42b9 Prevent double-close for SFTP client 2022-03-05 11:33:34 -07:00
Ben Johnson
07d220028a Rename 'gcs' to 'gs' for consistency 2022-03-05 11:17:42 -07:00
Ben Johnson
8ee5fcb591 Read config file from present working directory, if present 2022-03-05 11:07:49 -07:00
Ben Johnson
7fe79d3883 Add -addr flag to replicate command 2022-03-05 09:55:15 -07:00
Ben Johnson
14026421b2 Disable dependabot 2022-03-05 08:55:50 -07:00
Ben Johnson
59de3a01ba Upgrade mattn/go-sqlite3 to v1.14.12 2022-03-05 08:53:03 -07:00
Ben Johnson
c435b6b672 Pass first DB path to child process 2022-03-05 08:44:11 -07:00
Ben Johnson
62e301afd0 Change dependabot from weekly to monthly 2022-02-26 08:45:34 -07:00
Tobias Nießen
06ea1b13c1 Improve iterator Next() descriptions 2022-02-26 08:41:30 -07:00
Ben Johnson
a090706421 Implement live read replication
This commit adds an http server and client for streaming snapshots
and WAL pages from an upstream Litestream primary to a read-only
replica.
2022-02-19 09:06:49 -07:00
Ben Johnson
4898fc2fc1 Remove Docker linux/arm64 for PR builds 2022-02-18 14:39:59 -07:00
Ben Johnson
6f8cd5a9c4 Configurable monitor-delay-interval
The `monitor-delay-interval` has been added to the DB config so that
users can change the time period between WAL checks after a file
change notification has occurred. This can be useful to batch up
changes in larger files in the shadow WAL or to reduce or eliminate
the delay in propagating changes during read replication.

Setting the interval to zero or less will disable it.
2022-02-18 14:38:50 -07:00
Ben Johnson
4027c87a02 Fix Docker arch mismatch 2022-02-15 16:02:02 -07:00
Ben Johnson
fde17d0e62 Upgrade dependencies 2022-02-15 13:53:59 -07:00
Ben Johnson
fc42576e47 Add Docker arm/v7 to CI 2022-02-15 12:31:39 -07:00
Campbell Vertesi
1a630aed04 Add docker multiarch build and push to release
Co-authored-by: Ben Johnson <benbjohnson@yahoo.com>
2022-02-15 12:08:21 -07:00
Ben Johnson
8589111717 Implement streaming WAL segment iterator
Currently, WALSegmentIterator implementations read to the end of
the end of their list of segments and return EOF. This commit adds
the ability to push additional segments to in-process iterators and
notify their callers that new segments are available. This is only
implemented for the file-based iterator but other segment iterators
may get this implementation in the future or have a wrapping
iterator provide a polling-based implementation.
2022-02-11 13:50:44 -07:00
Ben Johnson
006e4b7155 Update index & offset encoding
Previously, the index & offsets were encoded as 8-character hex
strings, however, this limits the maximum value to a `uint32`. This
is normally not an issue, however, indices could go over the maximum
value of 4 billion over time and the offset could exceed this value
for an especially large WAL update. For safety, these encodings have
been updated to 16-character hex encodings.
2022-02-08 13:14:49 -07:00
Ben Johnson
54f3b94d3f Upgrade dependencies
- github.com/aws/aws-sdk-go v1.42.44 => v1.42.48
- cloud.google.com/go/storage v1.19.0 => v1.20.0
- github.com/pierrec/lz4/v4 v4.1.12 => v4.1.14
- google.golang.org/api v0.66.0 => v0.67.0
2022-02-07 14:21:52 -07:00
Ben Johnson
30a8d07a81 Add WAL overrun validation
Under high write load, it is possible for write transactions from
another process to overrun the WAL between the time when Litestream
performs a RESTART checkpoint and when it obtains the write lock
immediately after. This change adds validation that an overrun has
not occurred and, if it has, it will start a new generation.
2022-02-07 13:35:20 -07:00
Ben Johnson
76e53dc6ea Remove built-in validation option
Previously, Litestream had a validator that worked most of the time
but also caused some false positives. It is difficult to provide
validation from with Litestream without controlling outside processes
that can also affect the database. As such, validation has been moved
out to the external CI test runner which provides a more consistent
validation process.
2022-02-06 11:37:06 -07:00
Ben Johnson
762c7ae531 Implement FileWatcher 2022-02-06 09:51:04 -07:00
Ben Johnson
8009bcf654 Remove Windows support
Unfortunately, I don't have the expertise or bandwidth to maintain
the Windows support in Litestream. I'm open to re-adding support in
the future but right now it is hindering development and is not
well-tested or well-used.
2022-02-05 08:19:31 -07:00
Ben Johnson
4349398ff5 Remove shadow WAL iterator
This commit removes the shadow WAL iterator and replaces it with a
fileWalSegmentIterator instead. This works since the shadow WAL now
has the same structure as the replica WAL. This reduces duplicate
code and will make it so read replication can be daisy chained in
the future.
2022-01-31 16:09:02 -07:00
dependabot[bot]
89560c8632 Bump github.com/prometheus/client_golang from 1.12.0 to 1.12.1
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.12.0 to 1.12.1.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.12.0...v1.12.1)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-31 14:27:38 -07:00
dependabot[bot]
5f38134032 Bump cloud.google.com/go/storage from 1.18.2 to 1.19.0
Bumps [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) from 1.18.2 to 1.19.0.
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
- [Commits](https://github.com/googleapis/google-cloud-go/compare/storage/v1.18.2...spanner/v1.19.0)

---
updated-dependencies:
- dependency-name: cloud.google.com/go/storage
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-31 13:55:52 -07:00
dependabot[bot]
d5c15593bb Bump google.golang.org/api from 0.65.0 to 0.66.0
Bumps [google.golang.org/api](https://github.com/googleapis/google-api-go-client) from 0.65.0 to 0.66.0.
- [Release notes](https://github.com/googleapis/google-api-go-client/releases)
- [Changelog](https://github.com/googleapis/google-api-go-client/blob/main/CHANGES.md)
- [Commits](https://github.com/googleapis/google-api-go-client/compare/v0.65.0...v0.66.0)

---
updated-dependencies:
- dependency-name: google.golang.org/api
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-31 13:50:18 -07:00
dependabot[bot]
fb3a3d904f Bump github.com/aws/aws-sdk-go from 1.42.40 to 1.42.44
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.42.40 to 1.42.44.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.42.40...v1.42.44)

---
updated-dependencies:
- dependency-name: github.com/aws/aws-sdk-go
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-31 13:15:13 -07:00
Ben Johnson
ee77592d7e Skip dependabot CI using branches 2022-01-31 13:01:24 -07:00
Ben Johnson
a2cf2e260b Skip some CI jobs for dependabot 2022-01-31 12:46:06 -07:00
Ben Johnson
5d811f2e39 Fix golangci-lint issues 2022-01-31 09:21:20 -07:00
Ben Johnson
e84994ad95 Add golangci-lint to CI 2022-01-31 09:21:20 -07:00
Ben Johnson
f6c859061b Fix CodeQL warnings 2022-01-31 08:53:21 -07:00
Ben Johnson
0dfa5f98d1 Re-enable SFTP integration tests 2022-01-30 09:15:31 -07:00
Ben Johnson
906ed9b3ca Revert "Add test runner request action"
This reverts commit 26f219da1d.
2022-01-30 08:57:46 -07:00
Ben Johnson
26f219da1d Add test runner request action 2022-01-30 08:51:55 -07:00
Ben Johnson
f8382cfa15 Dispatch test runner in CI 2022-01-28 15:59:02 -07:00
Ben Johnson
dbdde21341 Use sqlite3_file_control(SQLITE_FCNTL_PERSIST_WAL) to persist WAL
Previously, Litestream would avoid closing the SQLite3 connection
in order to ensure that the WAL file was not cleaned up by the
database if it was the last connection. This commit changes the
behavior by introducing a file control call to perform the same
action. This allows us to close the database file normally in all
cases.
2022-01-28 15:12:43 -07:00
Ben Johnson
1741c82839 Produce build for every pull request 2022-01-26 16:10:39 -07:00
Ben Johnson
ffaba87b40 Separate out GitHub Actions 2022-01-25 16:20:38 -07:00
dependabot[bot]
8d759bb0b8 Bump github.com/aws/aws-sdk-go from 1.42.39 to 1.42.40
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.42.39 to 1.42.40.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.42.39...v1.42.40)

---
updated-dependencies:
- dependency-name: github.com/aws/aws-sdk-go
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-24 13:10:29 -07:00
Ben Johnson
8950de8f7e Update dependabot.yml 2022-01-24 13:09:32 -07:00
Ben Johnson
39114502f3 Create codeql-analysis.yml 2022-01-23 09:30:17 -07:00
Ben Johnson
5d24f91ea7 Upgrade github.com/prometheus/client_golang@v1.12.0 2022-01-23 09:30:03 -07:00
Ben Johnson
55c475e3fe Upgrade github.com/pkg/sftp@v1.13.4 2022-01-23 09:24:32 -07:00
Ben Johnson
500cfd8bf4 Upgrade shellwords, golang.org/x 2022-01-23 09:19:10 -07:00
Ben Johnson
90715ef8f3 Upgrade azure-storage-blob-go to v0.14.0 2022-01-23 09:08:21 -07:00
Matt Joiner
79b50c6944 Update sqlite 3.36 2022-01-23 08:39:22 -07:00
dependabot[bot]
d045b7bef0 Bump google.golang.org/api from 0.45.0 to 0.65.0
Bumps [google.golang.org/api](https://github.com/googleapis/google-api-go-client) from 0.45.0 to 0.65.0.
- [Release notes](https://github.com/googleapis/google-api-go-client/releases)
- [Changelog](https://github.com/googleapis/google-api-go-client/blob/main/CHANGES.md)
- [Commits](https://github.com/googleapis/google-api-go-client/compare/v0.45.0...v0.65.0)

---
updated-dependencies:
- dependency-name: google.golang.org/api
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-22 11:34:08 -07:00
dependabot[bot]
6c5fb2c446 Bump cloud.google.com/go/storage from 1.15.0 to 1.18.2
Bumps [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) from 1.15.0 to 1.18.2.
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
- [Commits](https://github.com/googleapis/google-cloud-go/compare/pubsub/v1.15.0...storage/v1.18.2)

---
updated-dependencies:
- dependency-name: cloud.google.com/go/storage
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-22 11:28:33 -07:00
dependabot[bot]
0b533e5d7b Bump github.com/aws/aws-sdk-go from 1.27.0 to 1.42.39
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.27.0 to 1.42.39.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.27.0...v1.42.39)

---
updated-dependencies:
- dependency-name: github.com/aws/aws-sdk-go
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-22 11:24:24 -07:00
dependabot[bot]
17831c7025 Bump github.com/pierrec/lz4/v4 from 4.1.3 to 4.1.12
Bumps [github.com/pierrec/lz4/v4](https://github.com/pierrec/lz4) from 4.1.3 to 4.1.12.
- [Release notes](https://github.com/pierrec/lz4/releases)
- [Commits](https://github.com/pierrec/lz4/compare/v4.1.3...v4.1.12)

---
updated-dependencies:
- dependency-name: github.com/pierrec/lz4/v4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-01-22 11:19:48 -07:00
Ben Johnson
b8d04957a2 Update CI 2022-01-22 11:14:59 -07:00
Ben Johnson
0a6474fb28 Restrict CI jobs 2022-01-22 11:00:38 -07:00
Ben Johnson
c7aa3635fd Parallelize GitHub Actions 2022-01-22 10:28:57 -07:00
Ben Johnson
b8536fa4f3 dependabot.yml 2022-01-22 10:02:32 -07:00
Ben Johnson
84d08f547a Add end-to-end replication/restore testing 2022-01-15 09:05:46 -07:00
Ben Johnson
f308e0b154 CLI test coverage 2022-01-11 13:29:20 -07:00
Ben Johnson
3f0ec9fa9f Refactor Restore()
This commit refactors out the complexity of downloading ordered WAL
files in parallel to a type called `WALDownloader`. This makes it
easier to test the restore separately from the download.
2022-01-04 15:03:59 -07:00
Ben Johnson
531e19ed6f Refactor checksum calculation; improve test coverage 2021-12-12 10:25:20 -07:00
Michael Lynch
ba6e13b5d0 Sort output of snapshots in descending timestamp order
By default, the snapshots command seems to output in alphabetical order of hash, which isn't meaningful, as far as I can tell.

This change modifies the order of the command output so that ./litestream snapshots returns snapshots from newest to oldest.
2021-12-08 18:52:49 -07:00
Ben Johnson
d09f4ef618 Fix FindMinSnapshotByGeneration() loop ref bug
This commit fixes an issue where the reference is taken
on the loop variable rather than the slice element when
computing the minimum snapshot within a generation so
it can cause the wrong snapshot to be chosen.
2021-12-08 18:51:10 -07:00
Ben Johnson
61c80cbfc2 README 2021-12-05 08:44:19 -07:00
Ben Johnson
755f54f4d9 Update CONTRIBUTING & remove pull request template 2021-10-10 08:35:29 -06:00
Colin Arnott
cb33d8c6a9 Replica.Restore fallback to DB.path
Per the godoc on Replica.Restore and RestoreOptions.OutputPath,
Replica.db.path should be used when RestoreOptions.OutputPath is empty.

Fixes #233
2021-10-06 16:00:31 -06:00
Ben Johnson
aa2c684c81 Update contribution policy 2021-10-02 09:21:24 -06:00
Ben Johnson
6db06067b5 README 2021-09-21 15:31:11 -06:00
Ben Johnson
77274abf81 Refactor shadow WAL to use segments 2021-07-23 07:46:21 -06:00
Ben Johnson
fc897b481f Group replica wal segments by index
This commit changes the replica path format to group segments within
a single index in the same directory. This is to eventually add the
ability to seek to a record on file-based systems without having
to iterate over the records. The DB shadow WAL will also be changed
to this same format to support live replicas.
2021-06-14 15:24:05 -06:00
343 changed files with 9275 additions and 4141 deletions

30
.github/workflows/build_and_test.yml vendored Normal file
View File

@@ -0,0 +1,30 @@
name: "Build and Unit Test"
on: pull_request
jobs:
build:
name: Build
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.17'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
- name: Build binary
run: go install ./cmd/litestream
- name: Run unit tests
run: make testdata && go test -v --coverprofile=.coverage.out ./... && go tool cover -html .coverage.out -o .coverage.html
- uses: actions/upload-artifact@v3
with:
name: code-coverage
path: .coverage.html

38
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@@ -0,0 +1,38 @@
name: "CodeQL"
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
schedule:
- cron: '20 16 * * 4'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
- name: Autobuild
uses: github/codeql-action/autobuild@v1
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1

View File

@@ -1,121 +0,0 @@
on: push
jobs:
build:
name: Build & Unit Test
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.20'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
- run: go env
- run: go install ./cmd/litestream
- run: go test -v ./...
# - name: Build integration test
# run: go test -c ./integration
#
# - uses: actions/upload-artifact@v2
# with:
# name: integration.test
# path: integration.test
# if-no-files-found: error
# long-running-test:
# name: Run Long Running Unit Test
# runs-on: ubuntu-22.04
# steps:
# - uses: actions/checkout@v2
# - uses: actions/setup-go@v2
# with:
# go-version: '1.20'
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: ${{ inputs.os }}-go-
#
# - run: go install ./cmd/litestream
# - run: go test -v -run=TestCmd_Replicate_LongRunning ./integration -long-running-duration 1m
# s3-integration-test:
# name: Run S3 Integration Tests
# runs-on: ubuntu-18.04
# needs: build
# steps:
# - uses: actions/download-artifact@v2
# with:
# name: integration.test
# - run: chmod +x integration.test
#
# - run: ./integration.test -test.v -test.run=TestReplicaClient -replica-type s3
# env:
# LITESTREAM_S3_ACCESS_KEY_ID: ${{ secrets.LITESTREAM_S3_ACCESS_KEY_ID }}
# LITESTREAM_S3_SECRET_ACCESS_KEY: ${{ secrets.LITESTREAM_S3_SECRET_ACCESS_KEY }}
# LITESTREAM_S3_REGION: us-east-1
# LITESTREAM_S3_BUCKET: integration.litestream.io
# gcp-integration-test:
# name: Run GCP Integration Tests
# runs-on: ubuntu-18.04
# needs: build
# steps:
# - name: Extract GCP credentials
# run: 'echo "$GOOGLE_APPLICATION_CREDENTIALS" > /opt/gcp.json'
# shell: bash
# env:
# GOOGLE_APPLICATION_CREDENTIALS: ${{secrets.GOOGLE_APPLICATION_CREDENTIALS}}
#
# - uses: actions/download-artifact@v2
# with:
# name: integration.test
# - run: chmod +x integration.test
#
# - run: ./integration.test -test.v -test.run=TestReplicaClient -replica-type gcs
# env:
# GOOGLE_APPLICATION_CREDENTIALS: /opt/gcp.json
# LITESTREAM_GCS_BUCKET: integration.litestream.io
# abs-integration-test:
# name: Run Azure Blob Store Integration Tests
# runs-on: ubuntu-18.04
# needs: build
# steps:
# - uses: actions/download-artifact@v2
# with:
# name: integration.test
# - run: chmod +x integration.test
#
# - run: ./integration.test -test.v -test.run=TestReplicaClient -replica-type abs
# env:
# LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }}
# LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }}
# LITESTREAM_ABS_BUCKET: integration
# sftp-integration-test:
# name: Run SFTP Integration Tests
# runs-on: ubuntu-18.04
# needs: build
# steps:
# - name: Extract SSH key
# run: 'echo "$LITESTREAM_SFTP_KEY" > /opt/id_ed25519'
# shell: bash
# env:
# LITESTREAM_SFTP_KEY: ${{secrets.LITESTREAM_SFTP_KEY}}
#
# - name: Run sftp tests
# run: go test -v -run=TestReplicaClient ./integration -replica-type sftp
# env:
# LITESTREAM_SFTP_HOST: ${{ secrets.LITESTREAM_SFTP_HOST }}
# LITESTREAM_SFTP_USER: ${{ secrets.LITESTREAM_SFTP_USER }}
# LITESTREAM_SFTP_KEY_PATH: /opt/id_ed25519
# LITESTREAM_SFTP_PATH: ${{ secrets.LITESTREAM_SFTP_PATH }}

18
.github/workflows/golangci-lint.yml vendored Normal file
View File

@@ -0,0 +1,18 @@
name: golangci-lint
on:
pull_request:
permissions:
contents: read
jobs:
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: golangci/golangci-lint-action@v2
with:
version: latest
args: --timeout=10m

138
.github/workflows/integration_test.yml vendored Normal file
View File

@@ -0,0 +1,138 @@
name: Integration Tests
on:
pull_request:
branches-ignore:
- "dependabot/**"
jobs:
s3-integration-test:
name: Run S3 Integration Tests
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.17'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
- run: go install ./cmd/litestream
- run: go test -v -run=TestReplicaClient ./integration -replica-type s3
env:
LITESTREAM_S3_ACCESS_KEY_ID: ${{ secrets.LITESTREAM_S3_ACCESS_KEY_ID }}
LITESTREAM_S3_SECRET_ACCESS_KEY: ${{ secrets.LITESTREAM_S3_SECRET_ACCESS_KEY }}
LITESTREAM_S3_REGION: us-east-1
LITESTREAM_S3_BUCKET: integration.litestream.io
gcp-integration-test:
name: Run GCP Integration Tests
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.17'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
- name: Extract GCP credentials
run: 'echo "$GOOGLE_APPLICATION_CREDENTIALS" > /opt/gcp.json'
shell: bash
env:
GOOGLE_APPLICATION_CREDENTIALS: ${{secrets.GOOGLE_APPLICATION_CREDENTIALS}}
- run: go test -v -run=TestReplicaClient ./integration -replica-type gs
env:
GOOGLE_APPLICATION_CREDENTIALS: /opt/gcp.json
LITESTREAM_GS_BUCKET: integration.litestream.io
abs-integration-test:
name: Run Azure Blob Store Integration Tests
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.17'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
- run: go test -v -run=TestReplicaClient ./integration -replica-type abs
env:
LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }}
LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }}
LITESTREAM_ABS_BUCKET: integration
sftp-integration-test:
name: Run SFTP Integration Tests
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.17'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
- name: Extract SSH key
run: 'echo "$LITESTREAM_SFTP_KEY" > /opt/id_ed25519'
shell: bash
env:
LITESTREAM_SFTP_KEY: ${{secrets.LITESTREAM_SFTP_KEY}}
- name: Run sftp tests w/ key
run: go test -v -run=TestReplicaClient ./integration -replica-type sftp
env:
LITESTREAM_SFTP_HOST: litestream-test-sftp.fly.dev:2222
LITESTREAM_SFTP_USER: litestream
LITESTREAM_SFTP_PATH: /litestream
LITESTREAM_SFTP_KEY_PATH: /opt/id_ed25519
- name: Run sftp tests w/ password
run: go test -v -run=TestReplicaClient ./integration -replica-type sftp
env:
LITESTREAM_SFTP_HOST: litestream-test-sftp.fly.dev:2222
LITESTREAM_SFTP_USER: litestream
LITESTREAM_SFTP_PASSWORD: ${{ secrets.LITESTREAM_SFTP_PASSWORD }}
LITESTREAM_SFTP_PATH: /litestream
long-running-test:
name: Run Long-Running Test
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.17'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
- run: go install ./cmd/litestream
- run: go test -v -run=TestCmd_Replicate_LongRunning ./integration -long-running-duration 1m

View File

@@ -15,7 +15,7 @@ jobs:
docker:
runs-on: ubuntu-latest
env:
PLATFORMS: "linux/amd64,linux/arm64,linux/arm/v7"
PLATFORMS: "${{ github.event_name == 'release' && 'linux/amd64,linux/arm64,linux/arm/v7' || 'linux/amd64' }}"
VERSION: "${{ github.event_name == 'release' && github.event.release.name || github.sha }}"
steps:

View File

@@ -1,42 +1,71 @@
on:
release:
types:
- created
- published
pull_request:
types:
- opened
- synchronize
- reopened
branches-ignore:
- "dependabot/**"
name: release (linux)
name: Release (Linux)
jobs:
build:
runs-on: ubuntu-18.04
strategy:
matrix:
include:
- arch: amd64
cc: gcc
- arch: amd64
cc: gcc
- arch: amd64
cc: gcc
static: true
- arch: arm64
cc: aarch64-linux-gnu-gcc
- arch: arm64
cc: aarch64-linux-gnu-gcc
static: true
- arch: arm
arm: 6
cc: arm-linux-gnueabi-gcc
- arch: arm
arm: 6
cc: arm-linux-gnueabi-gcc
static: true
- arch: arm
arm: 7
cc: arm-linux-gnueabihf-gcc
- arch: arm
arm: 7
cc: arm-linux-gnueabihf-gcc
static: true
env:
GOOS: linux
GOARCH: ${{ matrix.arch }}
GOARM: ${{ matrix.arm }}
CC: ${{ matrix.cc }}
LDFLAGS: ${{ matrix.static && '-extldflags "-static"' || '' }}
TAGS: ${{ matrix.static && 'osusergo,netgo,sqlite_omit_load_extension' || '' }}
SUFFIX: "${{ matrix.static && '-static' || ''}}"
VERSION: "${{ github.event_name == 'release' && github.event.release.name || github.sha }}"
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.16'
- id: release
uses: bruceadams/get-release@v1.2.2
env:
GITHUB_TOKEN: ${{ github.token }}
go-version: '1.17'
- name: Install cross-compilers
run: |
@@ -50,32 +79,56 @@ jobs:
- name: Build litestream
run: |
rm -rf dist
mkdir -p dist
rm -rf dist && mkdir -p dist
cp etc/litestream.yml etc/litestream.service dist
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml
CGO_ENABLED=1 go build -ldflags "-s -w -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o dist/litestream ./cmd/litestream
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ env.VERSION }} envsubst > dist/nfpm.yml
CGO_ENABLED=1 go build -ldflags "-s -w ${{ env.LDFLAGS }} -X 'main.Version=${{ env.VERSION }}'" -tags "${{ env.TAGS }}" -o dist/litestream ./cmd/litestream
cd dist
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz litestream
../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb
tar -czvf litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.tar.gz litestream
../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.deb
- name: Upload binary artifact
uses: actions/upload-artifact@v2
with:
name: litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.tar.gz
path: dist/litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.tar.gz
if-no-files-found: error
- name: Upload debian artifact
uses: actions/upload-artifact@v2
with:
name: litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.deb
path: dist/litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.deb
if-no-files-found: error
- name: Get release
id: release
uses: bruceadams/get-release@v1.2.3
if: github.event_name == 'release'
env:
GITHUB_TOKEN: ${{ github.token }}
- name: Upload release tarball
uses: actions/upload-release-asset@v1.0.2
if: github.event_name == 'release'
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz
asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz
asset_path: ./dist/litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.tar.gz
asset_name: litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.tar.gz
asset_content_type: application/gzip
- name: Upload debian package
uses: actions/upload-release-asset@v1.0.2
if: github.event_name == 'release'
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb
asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb
asset_path: ./dist/litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.deb
asset_name: litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.deb
asset_content_type: application/octet-stream

View File

@@ -1,62 +0,0 @@
on:
release:
types:
- created
name: release (linux/static)
jobs:
build:
runs-on: ubuntu-18.04
strategy:
matrix:
include:
- arch: amd64
cc: gcc
- arch: arm64
cc: aarch64-linux-gnu-gcc
- arch: arm
arm: 6
cc: arm-linux-gnueabi-gcc
- arch: arm
arm: 7
cc: arm-linux-gnueabihf-gcc
env:
GOOS: linux
GOARCH: ${{ matrix.arch }}
GOARM: ${{ matrix.arm }}
CC: ${{ matrix.cc }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.16'
- id: release
uses: bruceadams/get-release@v1.2.2
env:
GITHUB_TOKEN: ${{ github.token }}
- name: Install cross-compilers
run: |
sudo apt-get update
sudo apt-get install -y gcc-aarch64-linux-gnu gcc-arm-linux-gnueabihf gcc-arm-linux-gnueabi
- name: Build litestream
run: |
rm -rf dist
mkdir -p dist
CGO_ENABLED=1 go build -ldflags "-s -w -extldflags "-static" -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -tags osusergo,netgo,sqlite_omit_load_extension -o dist/litestream ./cmd/litestream
cd dist
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz litestream
- name: Upload release tarball
uses: actions/upload-release-asset@v1.0.2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz
asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz
asset_content_type: application/gzip

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.coverage.*
.DS_Store
/dist

View File

@@ -1,5 +1,11 @@
.PHONY: default
default:
.PHONY: testdata
testdata:
make -C testdata
make -C cmd/litestream testdata
docker:
docker build -t litestream .

View File

@@ -102,7 +102,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
Prefix: litestream.GenerationsPath(c.Path) + "/",
Prefix: path.Join(c.Path, "generations") + "/",
})
if err != nil {
return nil, err
@@ -125,18 +125,17 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error {
if err := c.Init(ctx); err != nil {
return err
} else if generation == "" {
return fmt.Errorf("generation required")
}
dir, err := litestream.GenerationPath(c.Path, generation)
if err != nil {
return fmt.Errorf("cannot determine generation path: %w", err)
}
prefix := path.Join(c.Path, "generations", generation) + "/"
var marker azblob.Marker
for marker.NotDone() {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return err
}
@@ -171,12 +170,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
if err := c.Init(ctx); err != nil {
return info, err
} else if generation == "" {
return info, fmt.Errorf("generation required")
}
key, err := litestream.SnapshotPath(c.Path, generation, index)
if err != nil {
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
}
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
startTime := time.Now()
rc := internal.NewReadCounter(rd)
@@ -192,8 +190,6 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N()))
// log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
return litestream.SnapshotInfo{
Generation: generation,
Index: index,
@@ -206,12 +202,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil {
return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
}
key, err := litestream.SnapshotPath(c.Path, generation, index)
if err != nil {
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
}
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
blobURL := c.containerURL.NewBlobURL(key)
resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
@@ -231,12 +226,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
if err := c.Init(ctx); err != nil {
return err
} else if generation == "" {
return fmt.Errorf("generation required")
}
key, err := litestream.SnapshotPath(c.Path, generation, index)
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -261,12 +255,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) {
if err := c.Init(ctx); err != nil {
return info, err
} else if pos.Generation == "" {
return info, fmt.Errorf("generation required")
}
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
if err != nil {
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
}
key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
startTime := time.Now()
rc := internal.NewReadCounter(rd)
@@ -296,12 +289,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil {
return nil, err
} else if pos.Generation == "" {
return nil, fmt.Errorf("generation required")
}
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
if err != nil {
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
}
key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
blobURL := c.containerURL.NewBlobURL(key)
resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
@@ -324,11 +316,12 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
}
for _, pos := range a {
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
if err != nil {
return fmt.Errorf("cannot determine wal segment path: %w", err)
if pos.Generation == "" {
return fmt.Errorf("generation required")
}
key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
blobURL := c.containerURL.NewBlobURL(key)
@@ -372,24 +365,24 @@ func newSnapshotIterator(ctx context.Context, generation string, client *Replica
func (itr *snapshotIterator) fetch() error {
defer close(itr.ch)
dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation)
if err != nil {
return fmt.Errorf("cannot determine snapshots path: %w", err)
if itr.generation == "" {
return fmt.Errorf("generation required")
}
prefix := path.Join(itr.client.Path, "generations", itr.generation) + "/"
var marker azblob.Marker
for marker.NotDone() {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return err
}
marker = resp.NextMarker
for _, item := range resp.Segment.BlobItems {
key := path.Base(item.Name)
index, err := litestream.ParseSnapshotPath(key)
index, err := internal.ParseSnapshotPath(path.Base(item.Name))
if err != nil {
continue
}
@@ -478,24 +471,24 @@ func newWALSegmentIterator(ctx context.Context, generation string, client *Repli
func (itr *walSegmentIterator) fetch() error {
defer close(itr.ch)
dir, err := litestream.WALPath(itr.client.Path, itr.generation)
if err != nil {
return fmt.Errorf("cannot determine wal path: %w", err)
if itr.generation == "" {
return fmt.Errorf("generation required")
}
prefix := path.Join(itr.client.Path, "generations", itr.generation, "wal")
var marker azblob.Marker
for marker.NotDone() {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return err
}
marker = resp.NextMarker
for _, item := range resp.Segment.BlobItems {
key := path.Base(item.Name)
index, offset, err := litestream.ParseWALSegmentPath(key)
key := strings.TrimPrefix(item.Name, prefix+"/")
index, offset, err := internal.ParseWALSegmentPath(key)
if err != nil {
continue
}

6
cmd/litestream/Makefile Normal file
View File

@@ -0,0 +1,6 @@
.PHONY: default
default:
.PHONY: testdata
testdata:
make -C testdata

View File

@@ -4,18 +4,34 @@ import (
"context"
"flag"
"fmt"
"os"
"io"
"strings"
"text/tabwriter"
)
// DatabasesCommand is a command for listing managed databases.
type DatabasesCommand struct{}
type DatabasesCommand struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
configPath string
noExpandEnv bool
}
// NewDatabasesCommand returns a new instance of DatabasesCommand.
func NewDatabasesCommand(stdin io.Reader, stdout, stderr io.Writer) *DatabasesCommand {
return &DatabasesCommand{
stdin: stdin,
stdout: stdout,
stderr: stderr,
}
}
// Run executes the command.
func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-databases", flag.ContinueOnError)
configPath, noExpandEnv := registerConfigFlag(fs)
registerConfigFlag(fs, &c.configPath, &c.noExpandEnv)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
@@ -24,16 +40,16 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
}
// Load configuration.
if *configPath == "" {
*configPath = DefaultConfigPath()
}
config, err := ReadConfigFile(*configPath, !*noExpandEnv)
config, err := ReadConfigFile(c.configPath, !c.noExpandEnv)
if err != nil {
return err
} else if len(config.DBs) == 0 {
fmt.Fprintln(c.stdout, "No databases found in config file.")
return nil
}
// List all databases.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
w := tabwriter.NewWriter(c.stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "path\treplicas")
@@ -59,7 +75,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
// Usage prints the help screen to STDOUT.
func (c *DatabasesCommand) Usage() {
fmt.Printf(`
fmt.Fprintf(c.stdout, `
The databases command lists all databases in the configuration file.
Usage:

View File

@@ -0,0 +1,66 @@
package main_test
import (
"context"
"flag"
"path/filepath"
"strings"
"testing"
"github.com/benbjohnson/litestream/internal/testingutil"
)
func TestDatabasesCommand(t *testing.T) {
t.Run("OK", func(t *testing.T) {
testDir := filepath.Join("testdata", "databases", "ok")
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"databases", "-config", filepath.Join(testDir, "litestream.yml")}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("NoDatabases", func(t *testing.T) {
testDir := filepath.Join("testdata", "databases", "no-databases")
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"databases", "-config", filepath.Join(testDir, "litestream.yml")}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ErrConfigNotFound", func(t *testing.T) {
testDir := filepath.Join("testdata", "databases", "no-config")
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"databases", "-config", filepath.Join(testDir, "litestream.yml")})
if err == nil || !strings.Contains(err.Error(), `config file not found:`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidConfig", func(t *testing.T) {
testDir := filepath.Join("testdata", "databases", "invalid-config")
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"databases", "-config", filepath.Join(testDir, "litestream.yml")})
if err == nil || !strings.Contains(err.Error(), `replica path cannot be a url`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrTooManyArguments", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"databases", "xyz"})
if err == nil || err.Error() != `too many arguments` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("Usage", func(t *testing.T) {
m, _, _, _ := newMain()
if err := m.Run(context.Background(), []string{"databases", "-h"}); err != flag.ErrHelp {
t.Fatalf("unexpected error: %s", err)
}
})
}

View File

@@ -4,117 +4,116 @@ import (
"context"
"flag"
"fmt"
"log"
"io"
"os"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
)
// GenerationsCommand represents a command to list all generations for a database.
type GenerationsCommand struct{}
type GenerationsCommand struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
configPath string
noExpandEnv bool
replicaName string
}
// NewGenerationsCommand returns a new instance of GenerationsCommand.
func NewGenerationsCommand(stdin io.Reader, stdout, stderr io.Writer) *GenerationsCommand {
return &GenerationsCommand{
stdin: stdin,
stdout: stdout,
stderr: stderr,
}
}
// Run executes the command.
func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) {
func (c *GenerationsCommand) Run(ctx context.Context, args []string) (ret error) {
fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError)
configPath, noExpandEnv := registerConfigFlag(fs)
replicaName := fs.String("replica", "", "replica name")
registerConfigFlag(fs, &c.configPath, &c.noExpandEnv)
fs.StringVar(&c.replicaName, "replica", "", "replica name")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
} else if fs.Arg(0) == "" {
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
var db *litestream.DB
var r *litestream.Replica
dbUpdatedAt := time.Now()
if isURL(fs.Arg(0)) {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
return err
}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
// Load configuration.
config, err := ReadConfigFile(*configPath, !*noExpandEnv)
if err != nil {
return err
}
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = NewDBFromConfig(dbc); err != nil {
return err
}
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
// Determine last time database or WAL was updated.
if dbUpdatedAt, err = db.UpdatedAt(); err != nil {
return err
}
// Load configuration.
config, err := ReadConfigFile(c.configPath, !c.noExpandEnv)
if err != nil {
return err
}
var replicas []*litestream.Replica
if r != nil {
replicas = []*litestream.Replica{r}
} else {
replicas = db.Replicas
replicas, db, err := loadReplicas(ctx, config, fs.Arg(0), c.replicaName)
if err != nil {
return err
}
// Determine last time database or WAL was updated.
var dbUpdatedAt time.Time
if db != nil {
if dbUpdatedAt, err = db.UpdatedAt(); err != nil && !os.IsNotExist(err) {
return err
}
}
// List each generation.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
w := tabwriter.NewWriter(c.stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range replicas {
generations, err := r.Client.Generations(ctx)
generations, err := r.Client().Generations(ctx)
if err != nil {
log.Printf("%s: cannot list generations: %s", r.Name(), err)
fmt.Fprintf(c.stderr, "%s: cannot list generations: %s", r.Name(), err)
ret = errExit // signal error return without printing message
continue
}
// Iterate over each generation for the replica.
for _, generation := range generations {
createdAt, updatedAt, err := r.GenerationTimeBounds(ctx, generation)
createdAt, updatedAt, err := litestream.GenerationTimeBounds(ctx, r.Client(), generation)
if err != nil {
log.Printf("%s: cannot determine generation time bounds: %s", r.Name(), err)
fmt.Fprintf(c.stderr, "%s: cannot determine generation time bounds: %s", r.Name(), err)
ret = errExit // signal error return without printing message
continue
}
// Calculate lag from database mod time to the replica mod time.
// This is ignored if the database mod time is unavailable such as
// when specifying the replica URL or if the database file is missing.
lag := "-"
if !dbUpdatedAt.IsZero() {
lag = internal.TruncateDuration(dbUpdatedAt.Sub(updatedAt)).String()
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
r.Name(),
generation,
truncateDuration(dbUpdatedAt.Sub(updatedAt)).String(),
lag,
createdAt.Format(time.RFC3339),
updatedAt.Format(time.RFC3339),
)
}
}
return nil
return ret
}
// Usage prints the help message to STDOUT.
func (c *GenerationsCommand) Usage() {
fmt.Printf(`
fmt.Fprintf(c.stdout, `
The generations command lists all generations for a database or replica. It also
lists stats about their lag behind the primary database and the time range they
cover.
@@ -141,29 +140,3 @@ Arguments:
DefaultConfigPath(),
)
}
func truncateDuration(d time.Duration) time.Duration {
if d < 0 {
if d < -10*time.Second {
return d.Truncate(time.Second)
} else if d < -time.Second {
return d.Truncate(time.Second / 10)
} else if d < -time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d < -time.Microsecond {
return d.Truncate(time.Microsecond)
}
return d
}
if d > 10*time.Second {
return d.Truncate(time.Second)
} else if d > time.Second {
return d.Truncate(time.Second / 10)
} else if d > time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d > time.Microsecond {
return d.Truncate(time.Microsecond)
}
return d
}

View File

@@ -0,0 +1,140 @@
package main_test
import (
"context"
"flag"
"path/filepath"
"strings"
"testing"
"github.com/benbjohnson/litestream/internal/testingutil"
)
func TestGenerationsCommand(t *testing.T) {
t.Run("OK", func(t *testing.T) {
testDir := filepath.Join("testdata", "generations", "ok")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"generations", "-config", filepath.Join(testDir, "litestream.yml"), filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ReplicaName", func(t *testing.T) {
testDir := filepath.Join("testdata", "generations", "replica-name")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"generations", "-config", filepath.Join(testDir, "litestream.yml"), "-replica", "replica1", filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ReplicaURL", func(t *testing.T) {
testDir := filepath.Join(testingutil.Getwd(t), "testdata", "generations", "replica-url")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
replicaURL := "file://" + filepath.ToSlash(testDir) + "/replica"
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"generations", replicaURL}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("NoDatabase", func(t *testing.T) {
testDir := filepath.Join("testdata", "generations", "no-database")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"generations", "-config", filepath.Join(testDir, "litestream.yml"), filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ErrDatabaseOrReplicaRequired", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations"})
if err == nil || err.Error() != `database path or replica URL required` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrTooManyArguments", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations", "abc", "123"})
if err == nil || err.Error() != `too many arguments` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidFlags", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations", "-no-such-flag"})
if err == nil || err.Error() != `flag provided but not defined: -no-such-flag` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrConfigFileNotFound", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations", "-config", "/no/such/file", "/var/lib/db"})
if err == nil || err.Error() != `config file not found: /no/such/file` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidConfig", func(t *testing.T) {
testDir := filepath.Join("testdata", "generations", "invalid-config")
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations", "-config", filepath.Join(testDir, "litestream.yml"), "/var/lib/db"})
if err == nil || !strings.Contains(err.Error(), `replica path cannot be a url`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrDatabaseNotFound", func(t *testing.T) {
testDir := filepath.Join("testdata", "generations", "database-not-found")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations", "-config", filepath.Join(testDir, "litestream.yml"), "/no/such/db"})
if err == nil || err.Error() != `database not found in config: /no/such/db` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrReplicaNotFound", func(t *testing.T) {
testDir := filepath.Join(testingutil.Getwd(t), "testdata", "generations", "replica-not-found")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations", "-config", filepath.Join(testDir, "litestream.yml"), "-replica", "no_such_replica", filepath.Join(testDir, "db")})
if err == nil || err.Error() != `replica "no_such_replica" not found for database "`+filepath.Join(testDir, "db")+`"` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidReplicaURL", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"generations", "xyz://xyz"})
if err == nil || !strings.Contains(err.Error(), `unknown replica type in config: "xyz"`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("Usage", func(t *testing.T) {
m, _, _, _ := newMain()
if err := m.Run(context.Background(), []string{"generations", "-h"}); err != flag.ErrHelp {
t.Fatalf("unexpected error: %s", err)
}
})
}

View File

@@ -5,23 +5,23 @@ import (
"errors"
"flag"
"fmt"
"io/ioutil"
"io"
"log"
"net/url"
"os"
"os/signal"
"os/user"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"time"
"filippo.io/age"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
_ "github.com/mattn/go-sqlite3"
@@ -33,14 +33,15 @@ var (
Version = "(development build)"
)
// errStop is a terminal error for indicating program should quit.
var errStop = errors.New("stop")
// errExit is a terminal error for indicating program should quit.
var errExit = errors.New("exit")
func main() {
log.SetFlags(0)
log.SetOutput(os.Stdout)
m := NewMain()
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
m := NewMain(os.Stdin, os.Stdout, os.Stderr)
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errExit {
os.Exit(1)
} else if err != nil {
log.Println(err)
@@ -49,22 +50,23 @@ func main() {
}
// Main represents the main program execution.
type Main struct{}
type Main struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
}
// NewMain returns a new instance of Main.
func NewMain() *Main {
return &Main{}
func NewMain(stdin io.Reader, stdout, stderr io.Writer) *Main {
return &Main{
stdin: stdin,
stdout: stdout,
stderr: stderr,
}
}
// Run executes the program.
func (m *Main) Run(ctx context.Context, args []string) (err error) {
// Execute replication command if running as a Windows service.
if isService, err := isWindowsService(); err != nil {
return err
} else if isService {
return runWindowsService(ctx)
}
// Copy "LITESTEAM" environment credentials.
applyLitestreamEnv()
@@ -76,36 +78,43 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
switch cmd {
case "databases":
return (&DatabasesCommand{}).Run(ctx, args)
return NewDatabasesCommand(m.stdin, m.stdout, m.stderr).Run(ctx, args)
case "generations":
return (&GenerationsCommand{}).Run(ctx, args)
return NewGenerationsCommand(m.stdin, m.stdout, m.stderr).Run(ctx, args)
case "replicate":
c := NewReplicateCommand()
c := NewReplicateCommand(m.stdin, m.stdout, m.stderr)
if err := c.ParseFlags(ctx, args); err != nil {
return err
}
// Setup signal handler.
signalCh := signalChan()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
if err := c.Run(); err != nil {
if err := c.Run(ctx); err != nil {
return err
}
// Wait for signal to stop program.
select {
case <-ctx.Done():
fmt.Fprintln(m.stdout, "context done, litestream shutting down")
case err = <-c.execCh:
fmt.Println("subprocess exited, litestream shutting down")
cancel()
fmt.Fprintln(m.stdout, "subprocess exited, litestream shutting down")
case sig := <-signalCh:
fmt.Println("signal received, litestream shutting down")
cancel()
fmt.Fprintln(m.stdout, "signal received, litestream shutting down")
if c.cmd != nil {
fmt.Println("sending signal to exec process")
fmt.Fprintln(m.stdout, "sending signal to exec process")
if err := c.cmd.Process.Signal(sig); err != nil {
return fmt.Errorf("cannot signal exec process: %w", err)
}
fmt.Println("waiting for exec process to close")
fmt.Fprintln(m.stdout, "waiting for exec process to close")
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
return fmt.Errorf("cannot wait for exec process: %w", err)
}
@@ -116,17 +125,17 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
if e := c.Close(); e != nil && err == nil {
err = e
}
fmt.Println("litestream shut down")
fmt.Fprintln(m.stdout, "litestream shut down")
return err
case "restore":
return (&RestoreCommand{}).Run(ctx, args)
return NewRestoreCommand(m.stdin, m.stdout, m.stderr).Run(ctx, args)
case "snapshots":
return (&SnapshotsCommand{}).Run(ctx, args)
return NewSnapshotsCommand(m.stdin, m.stdout, m.stderr).Run(ctx, args)
case "version":
return (&VersionCommand{}).Run(ctx, args)
return NewVersionCommand(m.stdin, m.stdout, m.stderr).Run(ctx, args)
case "wal":
return (&WALCommand{}).Run(ctx, args)
return NewWALCommand(m.stdin, m.stdout, m.stderr).Run(ctx, args)
default:
if cmd == "" || cmd == "help" || strings.HasPrefix(cmd, "-") {
m.Usage()
@@ -138,7 +147,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
// Usage prints the help screen to STDOUT.
func (m *Main) Usage() {
fmt.Println(`
fmt.Fprintln(m.stdout, `
litestream is a tool for replicating SQLite databases.
Usage:
@@ -205,7 +214,34 @@ func (c *Config) DBConfig(path string) *DBConfig {
// ReadConfigFile unmarshals config from filename. Expands path if needed.
// If expandEnv is true then environment variables are expanded in the config.
func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
// If filename is blank then the default config path is used.
func ReadConfigFile(filename string, expandEnv bool) (config Config, err error) {
var filenames []string
if filename != "" {
filenames = append(filenames, filename)
}
filenames = append(filenames, "./litestream.yml")
filenames = append(filenames, DefaultConfigPath())
for _, name := range filenames {
isDefaultPath := name != filename
if config, err = readConfigFile(name, expandEnv); os.IsNotExist(err) {
if isDefaultPath {
continue
}
return config, fmt.Errorf("config file not found: %s", filename)
} else if err != nil {
return config, err
}
break
}
return config, nil
}
// readConfigFile unmarshals config from filename. Expands path if needed.
// If expandEnv is true then environment variables are expanded in the config.
func readConfigFile(filename string, expandEnv bool) (_ Config, err error) {
config := DefaultConfig()
// Expand filename, if necessary.
@@ -215,10 +251,9 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
}
// Read configuration.
buf, err := ioutil.ReadFile(filename)
if os.IsNotExist(err) {
return config, fmt.Errorf("config file not found: %s", filename)
} else if err != nil {
// Do not return an error if using default path and file is missing.
buf, err := os.ReadFile(filename)
if err != nil {
return config, err
}
@@ -246,11 +281,12 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
MonitorInterval *time.Duration `yaml:"monitor-interval"`
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
Path string `yaml:"path"`
MonitorDelayInterval *time.Duration `yaml:"monitor-delay-interval"`
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
ShadowRetentionN *int `yaml:"shadow-retention-count"`
Replicas []*ReplicaConfig `yaml:"replicas"`
}
@@ -261,13 +297,17 @@ func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
if err != nil {
return nil, err
}
return NewDBFromConfigWithPath(dbc, path)
}
// NewDBFromConfigWithPath instantiates a DB based on a configuration and using a given path.
func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error) {
// Initialize database with given path.
db := litestream.NewDB(path)
// Override default database settings if specified in configuration.
if dbc.MonitorInterval != nil {
db.MonitorInterval = *dbc.MonitorInterval
if dbc.MonitorDelayInterval != nil {
db.MonitorDelayInterval = *dbc.MonitorDelayInterval
}
if dbc.CheckpointInterval != nil {
db.CheckpointInterval = *dbc.CheckpointInterval
@@ -278,6 +318,9 @@ func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
if dbc.MaxCheckpointPageN != nil {
db.MaxCheckpointPageN = *dbc.MaxCheckpointPageN
}
if dbc.ShadowRetentionN != nil {
db.ShadowRetentionN = *dbc.ShadowRetentionN
}
// Instantiate and attach replicas.
for _, rc := range dbc.Replicas {
@@ -321,12 +364,6 @@ type ReplicaConfig struct {
User string `yaml:"user"`
Password string `yaml:"password"`
KeyPath string `yaml:"key-path"`
// Encryption identities and recipients
Age struct {
Identities []string `yaml:"identities"`
Recipients []string `yaml:"recipients"`
} `yaml:"age"`
}
// NewReplicaFromConfig instantiates a replica for a DB based on a config.
@@ -336,8 +373,35 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", c.Path)
}
// Build and set client on replica.
var client litestream.ReplicaClient
switch typ := c.ReplicaType(); typ {
case "file":
if client, err = newFileReplicaClientFromConfig(c); err != nil {
return nil, err
}
case "s3":
if client, err = newS3ReplicaClientFromConfig(c); err != nil {
return nil, err
}
case "gs":
if client, err = newGSReplicaClientFromConfig(c); err != nil {
return nil, err
}
case "abs":
if client, err = newABSReplicaClientFromConfig(c); err != nil {
return nil, err
}
case "sftp":
if client, err = newSFTPReplicaClientFromConfig(c); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown replica type in config: %q", typ)
}
// Build replica.
r := litestream.NewReplica(db, c.Name)
r := litestream.NewReplica(db, c.Name, client)
if v := c.Retention; v != nil {
r.Retention = *v
}
@@ -353,54 +417,12 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
if v := c.ValidationInterval; v != nil {
r.ValidationInterval = *v
}
for _, str := range c.Age.Identities {
identities, err := age.ParseIdentities(strings.NewReader(str))
if err != nil {
return nil, err
}
r.AgeIdentities = append(r.AgeIdentities, identities...)
}
for _, str := range c.Age.Recipients {
recipients, err := age.ParseRecipients(strings.NewReader(str))
if err != nil {
return nil, err
}
r.AgeRecipients = append(r.AgeRecipients, recipients...)
}
// Build and set client on replica.
switch c.ReplicaType() {
case "file":
if r.Client, err = newFileReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "s3":
if r.Client, err = newS3ReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "gcs":
if r.Client, err = newGCSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "abs":
if r.Client, err = newABSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "sftp":
if r.Client, err = newSFTPReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
}
return r, nil
}
// newFileReplicaClientFromConfig returns a new instance of file.ReplicaClient built from config.
func newFileReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *file.ReplicaClient, err error) {
// newFileReplicaClientFromConfig returns a new instance of FileReplicaClient built from config.
func newFileReplicaClientFromConfig(c *ReplicaConfig) (_ *litestream.FileReplicaClient, err error) {
// Ensure URL & path are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for file replica")
@@ -425,13 +447,11 @@ func newFileReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_
}
// Instantiate replica and apply time fields, if set.
client := file.NewReplicaClient(path)
client.Replica = r
return client, nil
return litestream.NewFileReplicaClient(path), nil
}
// newS3ReplicaClientFromConfig returns a new instance of s3.ReplicaClient built from config.
func newS3ReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *s3.ReplicaClient, err error) {
func newS3ReplicaClientFromConfig(c *ReplicaConfig) (_ *s3.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for s3 replica")
@@ -493,13 +513,13 @@ func newS3ReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *s
return client, nil
}
// newGCSReplicaClientFromConfig returns a new instance of gcs.ReplicaClient built from config.
func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *gcs.ReplicaClient, err error) {
// newGSReplicaClientFromConfig returns a new instance of gs.ReplicaClient built from config.
func newGSReplicaClientFromConfig(c *ReplicaConfig) (_ *gs.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for gcs replica")
return nil, fmt.Errorf("cannot specify url & path for gs replica")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for gcs replica")
return nil, fmt.Errorf("cannot specify url & bucket for gs replica")
}
bucket, path := c.Bucket, c.Path
@@ -522,18 +542,18 @@ func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *
// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for gcs replica")
return nil, fmt.Errorf("bucket required for gs replica")
}
// Build replica.
client := gcs.NewReplicaClient()
client := gs.NewReplicaClient()
client.Bucket = bucket
client.Path = path
return client, nil
}
// newABSReplicaClientFromConfig returns a new instance of abs.ReplicaClient built from config.
func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *abs.ReplicaClient, err error) {
func newABSReplicaClientFromConfig(c *ReplicaConfig) (_ *abs.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for abs replica")
@@ -576,7 +596,7 @@ func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *
}
// newSFTPReplicaClientFromConfig returns a new instance of sftp.ReplicaClient built from config.
func newSFTPReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *sftp.ReplicaClient, err error) {
func newSFTPReplicaClientFromConfig(c *ReplicaConfig) (_ *sftp.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for sftp replica")
@@ -682,12 +702,12 @@ func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
return v
}
return defaultConfigPath
return "/etc/litestream.yml"
}
func registerConfigFlag(fs *flag.FlagSet) (configPath *string, noExpandEnv *bool) {
return fs.String("config", "", "config path"),
fs.Bool("no-expand-env", false, "do not expand env vars in config")
func registerConfigFlag(fs *flag.FlagSet, configPath *string, noExpandEnv *bool) {
fs.StringVar(configPath, "config", "", "config path")
fs.BoolVar(noExpandEnv, "no-expand-env", false, "do not expand env vars in config")
}
// expand returns an absolute path for s.
@@ -721,7 +741,7 @@ var _ flag.Value = (*indexVar)(nil)
// String returns an 8-character hexadecimal value.
func (v *indexVar) String() string {
return fmt.Sprintf("%08x", int(*v))
return litestream.FormatIndex(int(*v))
}
// Set parses s into an integer from a hexadecimal value.
@@ -733,3 +753,45 @@ func (v *indexVar) Set(s string) error {
*v = indexVar(i)
return nil
}
// loadReplicas returns a list of replicas to use based on CLI flags. Filters
// by replicaName, if not blank. The DB is returned if pathOrURL is not a replica URL.
func loadReplicas(ctx context.Context, config Config, pathOrURL, replicaName string) ([]*litestream.Replica, *litestream.DB, error) {
// Build a replica based on URL, if specified.
if isURL(pathOrURL) {
r, err := NewReplicaFromConfig(&ReplicaConfig{
URL: pathOrURL,
AccessKeyID: config.AccessKeyID,
SecretAccessKey: config.SecretAccessKey,
}, nil)
if err != nil {
return nil, nil, err
}
return []*litestream.Replica{r}, nil, nil
}
// Otherwise use replicas from the database configuration file.
path, err := expand(pathOrURL)
if err != nil {
return nil, nil, err
}
dbc := config.DBConfig(path)
if dbc == nil {
return nil, nil, fmt.Errorf("database not found in config: %s", path)
}
db, err := NewDBFromConfig(dbc)
if err != nil {
return nil, nil, err
}
// Filter by replica, if specified.
if replicaName != "" {
r := db.Replica(replicaName)
if r == nil {
return nil, nil, fmt.Errorf("replica %q not found for database %q", replicaName, db.Path())
}
return []*litestream.Replica{r}, db, nil
}
return db.Replicas, db, nil
}

View File

@@ -1,26 +0,0 @@
// +build !windows
package main
import (
"context"
"os"
"os/signal"
"syscall"
)
const defaultConfigPath = "/etc/litestream.yml"
func isWindowsService() (bool, error) {
return false, nil
}
func runWindowsService(ctx context.Context) error {
panic("cannot run windows service as unix process")
}
func signalChan() <-chan os.Signal {
ch := make(chan os.Signal, 2)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
return ch
}

View File

@@ -1,22 +1,28 @@
package main_test
import (
"io/ioutil"
"bytes"
"io"
"log"
"os"
"path/filepath"
"testing"
"github.com/benbjohnson/litestream"
main "github.com/benbjohnson/litestream/cmd/litestream"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/s3"
)
func init() {
litestream.LogFlags = log.Lmsgprefix | log.Ldate | log.Ltime | log.Lmicroseconds | log.LUTC | log.Lshortfile
}
func TestReadConfigFile(t *testing.T) {
// Ensure global AWS settings are propagated down to replica configurations.
t.Run("PropagateGlobalSettings", func(t *testing.T) {
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(`
if err := os.WriteFile(filename, []byte(`
access-key-id: XXX
secret-access-key: YYY
@@ -48,7 +54,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(`
if err := os.WriteFile(filename, []byte(`
dbs:
- path: $LITESTREAM_TEST_0129380
replicas:
@@ -75,7 +81,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(`
if err := os.WriteFile(filename, []byte(`
dbs:
- path: /path/to/db
replicas:
@@ -97,7 +103,7 @@ func TestNewFileReplicaFromConfig(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{Path: "/foo"}, nil)
if err != nil {
t.Fatal(err)
} else if client, ok := r.Client.(*file.ReplicaClient); !ok {
} else if client, ok := r.Client().(*litestream.FileReplicaClient); !ok {
t.Fatal("unexpected replica type")
} else if got, want := client.Path(), "/foo"; got != want {
t.Fatalf("Path=%s, want %s", got, want)
@@ -109,7 +115,7 @@ func TestNewS3ReplicaFromConfig(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if client, ok := r.Client.(*s3.ReplicaClient); !ok {
} else if client, ok := r.Client().(*s3.ReplicaClient); !ok {
t.Fatal("unexpected replica type")
} else if got, want := client.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
@@ -128,7 +134,7 @@ func TestNewS3ReplicaFromConfig(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.localhost:9000/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if client, ok := r.Client.(*s3.ReplicaClient); !ok {
} else if client, ok := r.Client().(*s3.ReplicaClient); !ok {
t.Fatal("unexpected replica type")
} else if got, want := client.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
@@ -147,7 +153,7 @@ func TestNewS3ReplicaFromConfig(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.s3.us-west-000.backblazeb2.com/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if client, ok := r.Client.(*s3.ReplicaClient); !ok {
} else if client, ok := r.Client().(*s3.ReplicaClient); !ok {
t.Fatal("unexpected replica type")
} else if got, want := client.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
@@ -163,11 +169,11 @@ func TestNewS3ReplicaFromConfig(t *testing.T) {
})
}
func TestNewGCSReplicaFromConfig(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "gcs://foo/bar"}, nil)
func TestNewGSReplicaFromConfig(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "gs://foo/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if client, ok := r.Client.(*gcs.ReplicaClient); !ok {
} else if client, ok := r.Client().(*gs.ReplicaClient); !ok {
t.Fatal("unexpected replica type")
} else if got, want := client.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
@@ -175,3 +181,17 @@ func TestNewGCSReplicaFromConfig(t *testing.T) {
t.Fatalf("Path=%s, want %s", got, want)
}
}
// newMain returns a new instance of Main and associated buffers.
func newMain() (m *main.Main, stdin, stdout, stderr *bytes.Buffer) {
stdin, stdout, stderr = &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}
// Split stdout/stderr to terminal if verbose flag set.
out, err := io.Writer(stdout), io.Writer(stderr)
if testing.Verbose() {
out = io.MultiWriter(out, os.Stdout)
err = io.MultiWriter(err, os.Stderr)
}
return main.NewMain(stdin, out, err), stdin, stdout, stderr
}

View File

@@ -1,112 +0,0 @@
// +build windows
package main
import (
"context"
"io"
"log"
"os"
"os/signal"
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/eventlog"
)
const defaultConfigPath = `C:\Litestream\litestream.yml`
// serviceName is the Windows Service name.
const serviceName = "Litestream"
// isWindowsService returns true if currently executing within a Windows service.
func isWindowsService() (bool, error) {
return svc.IsWindowsService()
}
func runWindowsService(ctx context.Context) error {
// Attempt to install new log service. This will fail if already installed.
// We don't log the error because we don't have anywhere to log until we open the log.
_ = eventlog.InstallAsEventCreate(serviceName, eventlog.Error|eventlog.Warning|eventlog.Info)
elog, err := eventlog.Open(serviceName)
if err != nil {
return err
}
defer elog.Close()
// Set eventlog as log writer while running.
log.SetOutput((*eventlogWriter)(elog))
defer log.SetOutput(os.Stderr)
log.Print("Litestream service starting")
if err := svc.Run(serviceName, &windowsService{ctx: ctx}); err != nil {
return errStop
}
log.Print("Litestream service stopped")
return nil
}
// windowsService is an interface adapter for svc.Handler.
type windowsService struct {
ctx context.Context
}
func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, statusCh chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) {
var err error
// Notify Windows that the service is starting up.
statusCh <- svc.Status{State: svc.StartPending}
// Instantiate replication command and load configuration.
c := NewReplicateCommand()
if c.Config, err = ReadConfigFile(DefaultConfigPath(), true); err != nil {
log.Printf("cannot load configuration: %s", err)
return true, 1
}
// Execute replication command.
if err := c.Run(s.ctx); err != nil {
log.Printf("cannot replicate: %s", err)
statusCh <- svc.Status{State: svc.StopPending}
return true, 2
}
// Notify Windows that the service is now running.
statusCh <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop}
for {
select {
case req := <-r:
switch req.Cmd {
case svc.Stop:
c.Close()
statusCh <- svc.Status{State: svc.StopPending}
return false, windows.NO_ERROR
case svc.Interrogate:
statusCh <- req.CurrentStatus
default:
log.Printf("Litestream service received unexpected change request cmd: %d", req.Cmd)
}
}
}
}
// Ensure implementation implements io.Writer interface.
var _ io.Writer = (*eventlogWriter)(nil)
// eventlogWriter is an adapter for using eventlog.Log as an io.Writer.
type eventlogWriter eventlog.Log
func (w *eventlogWriter) Write(p []byte) (n int, err error) {
elog := (*eventlog.Log)(w)
return 0, elog.Info(1, string(p))
}
func signalChan() <-chan os.Signal {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
return ch
}

View File

@@ -4,36 +4,45 @@ import (
"context"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/exec"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/http"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
"github.com/mattn/go-shellwords"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// ReplicateCommand represents a command that continuously replicates SQLite databases.
type ReplicateCommand struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
configPath string
noExpandEnv bool
cmd *exec.Cmd // subcommand
execCh chan error // subcommand error channel
Config Config
// List of managed databases specified in the config.
DBs []*litestream.DB
server *litestream.Server
httpServer *http.Server
}
func NewReplicateCommand() *ReplicateCommand {
// NewReplicateCommand returns a new instance of ReplicateCommand.
func NewReplicateCommand(stdin io.Reader, stdout, stderr io.Writer) *ReplicateCommand {
return &ReplicateCommand{
stdin: stdin,
stdout: stdout,
stderr: stderr,
execCh: make(chan error),
}
}
@@ -42,8 +51,8 @@ func NewReplicateCommand() *ReplicateCommand {
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
execFlag := fs.String("exec", "", "execute subcommand")
tracePath := fs.String("trace", "", "trace path")
configPath, noExpandEnv := registerConfigFlag(fs)
addr := fs.String("addr", "", "HTTP bind address (host:port)")
registerConfigFlag(fs, &c.configPath, &c.noExpandEnv)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
@@ -53,7 +62,7 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
if fs.NArg() == 1 {
return fmt.Errorf("must specify at least one replica URL for %s", fs.Arg(0))
} else if fs.NArg() > 1 {
if *configPath != "" {
if c.configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
@@ -67,34 +76,27 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
}
c.Config.DBs = []*DBConfig{dbConfig}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
if c.configPath == "" {
c.configPath = DefaultConfigPath()
}
if c.Config, err = ReadConfigFile(*configPath, !*noExpandEnv); err != nil {
if c.Config, err = ReadConfigFile(c.configPath, !c.noExpandEnv); err != nil {
return err
}
}
// Override config exec command, if specified.
// Override config with flags, if specified.
if *addr != "" {
c.Config.Addr = *addr
}
if *execFlag != "" {
c.Config.Exec = *execFlag
}
// Enable trace logging.
if *tracePath != "" {
f, err := os.Create(*tracePath)
if err != nil {
return err
}
defer f.Close()
litestream.Tracef = log.New(f, "", log.LstdFlags|log.Lmicroseconds|log.LUTC|log.Lshortfile).Printf
}
return nil
}
// Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run() (err error) {
func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
// Display version information.
log.Printf("litestream %s", Version)
@@ -103,29 +105,35 @@ func (c *ReplicateCommand) Run() (err error) {
log.Println("no databases specified in configuration")
}
c.server = litestream.NewServer()
if err := c.server.Open(); err != nil {
return fmt.Errorf("open server: %w", err)
}
// Add databases to the server.
for _, dbConfig := range c.Config.DBs {
db, err := NewDBFromConfig(dbConfig)
path, err := expand(dbConfig.Path)
if err != nil {
return err
}
// Open database & attach to program.
if err := db.Open(); err != nil {
if err := c.server.Watch(path, func(path string) (*litestream.DB, error) {
return NewDBFromConfigWithPath(dbConfig, path)
}); err != nil {
return err
}
c.DBs = append(c.DBs, db)
}
// Notify user that initialization is done.
for _, db := range c.DBs {
for _, db := range c.server.DBs() {
log.Printf("initialized db: %s", db.Path())
for _, r := range db.Replicas {
switch client := r.Client.(type) {
case *file.ReplicaClient:
switch client := r.Client().(type) {
case *litestream.FileReplicaClient:
log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), client.Type(), client.Path())
case *s3.ReplicaClient:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Region, client.Endpoint, r.SyncInterval)
case *gcs.ReplicaClient:
case *gs.ReplicaClient:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, r.SyncInterval)
case *abs.ReplicaClient:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Endpoint, r.SyncInterval)
@@ -137,22 +145,13 @@ func (c *ReplicateCommand) Run() (err error) {
}
}
// Serve metrics over HTTP if enabled.
// Serve HTTP if enabled.
if c.Config.Addr != "" {
hostport := c.Config.Addr
if host, port, _ := net.SplitHostPort(c.Config.Addr); port == "" {
return fmt.Errorf("must specify port for bind address: %q", c.Config.Addr)
} else if host == "" {
hostport = net.JoinHostPort("localhost", port)
c.httpServer = http.NewServer(c.server, c.Config.Addr)
if err := c.httpServer.Open(); err != nil {
return fmt.Errorf("cannot start http server: %w", err)
}
log.Printf("serving metrics on http://%s/metrics", hostport)
go func() {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(c.Config.Addr, nil); err != nil {
log.Printf("cannot start metrics server: %s", err)
}
}()
log.Printf("http server running at %s", c.httpServer.URL())
}
// Parse exec commands args & start subprocess.
@@ -162,8 +161,14 @@ func (c *ReplicateCommand) Run() (err error) {
return fmt.Errorf("cannot parse exec command: %w", err)
}
c.cmd = exec.Command(execArgs[0], execArgs[1:]...)
c.cmd.Env = os.Environ()
// Pass first database path to child process.
env := os.Environ()
if dbs := c.server.DBs(); len(dbs) > 0 {
env = append(env, fmt.Sprintf("LITESTREAM_DB_PATH=%s", dbs[0].Path()))
}
c.cmd = exec.CommandContext(ctx, execArgs[0], execArgs[1:]...)
c.cmd.Env = env
c.cmd.Stdout = os.Stdout
c.cmd.Stderr = os.Stderr
if err := c.cmd.Start(); err != nil {
@@ -172,17 +177,21 @@ func (c *ReplicateCommand) Run() (err error) {
go func() { c.execCh <- c.cmd.Wait() }()
}
log.Printf("litestream initialization complete")
return nil
}
// Close closes all open databases.
// Close closes the HTTP server & all open databases.
func (c *ReplicateCommand) Close() (err error) {
for _, db := range c.DBs {
if e := db.Close(); e != nil {
log.Printf("error closing db: path=%s err=%s", db.Path(), e)
if err == nil {
err = e
}
if c.httpServer != nil {
if e := c.httpServer.Close(); e != nil && err == nil {
err = e
}
}
if c.server != nil {
if e := c.server.Close(); e != nil && err == nil {
err = e
}
}
return err
@@ -190,7 +199,7 @@ func (c *ReplicateCommand) Close() (err error) {
// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
fmt.Fprintf(c.stdout, `
The replicate command starts a server to monitor & replicate databases.
You can specify your database & replicas in a configuration file or you can
replicate a single database file by specifying its path and its replicas in the
@@ -212,11 +221,12 @@ Arguments:
Executes a subcommand. Litestream will exit when the child
process exits. Useful for simple process management.
-addr BIND_ADDR
Starts an HTTP server that reports prometheus metrics and provides
an endpoint for live read replication. (e.g. ":9090")
-no-expand-env
Disables environment variable expansion in configuration file.
-trace PATH
Write verbose trace logging to PATH.
`[1:], DefaultConfigPath())
}

View File

@@ -0,0 +1,136 @@
package main_test
import (
"context"
"database/sql"
"errors"
"fmt"
"hash/crc64"
"io"
"os"
"path/filepath"
"runtime"
"testing"
"time"
"golang.org/x/sync/errgroup"
)
func TestReplicateCommand(t *testing.T) {
if testing.Short() {
t.Skip("long running test, skipping")
} else if runtime.GOOS != "linux" {
t.Skip("must run system tests on Linux, skipping")
}
const writeTime = 10 * time.Second
dir := t.TempDir()
configPath := filepath.Join(dir, "litestream.yml")
dbPath := filepath.Join(dir, "db")
restorePath := filepath.Join(dir, "restored")
replicaPath := filepath.Join(dir, "replica")
if err := os.WriteFile(configPath, []byte(`
dbs:
- path: `+dbPath+`
replicas:
- path: `+replicaPath+`
`), 0666); err != nil {
t.Fatal(err)
}
// Generate data into SQLite database from separate goroutine.
g, ctx := errgroup.WithContext(context.Background())
mainctx, cancel := context.WithCancel(ctx)
g.Go(func() error {
defer cancel()
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}
defer db.Close()
if _, err := db.ExecContext(ctx, `PRAGMA journal_mode = WAL`); err != nil {
return fmt.Errorf("cannot enable wal: %w", err)
} else if _, err := db.ExecContext(ctx, `PRAGMA synchronous = NORMAL`); err != nil {
return fmt.Errorf("cannot enable wal: %w", err)
} else if _, err := db.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil {
return fmt.Errorf("cannot create table: %w", err)
}
ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(writeTime)
defer timer.Stop()
for i := 0; ; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
case <-ticker.C:
if _, err := db.ExecContext(ctx, `INSERT INTO t (id) VALUES (?);`, i); err != nil {
return fmt.Errorf("cannot insert: i=%d err=%w", i, err)
}
}
}
})
// Replicate database unless the context is canceled.
g.Go(func() error {
m, _, _, _ := newMain()
return m.Run(mainctx, []string{"replicate", "-config", configPath})
})
if err := g.Wait(); err != nil {
t.Fatal(err)
}
// Checkpoint database.
mustCheckpoint(t, dbPath)
chksum0 := mustChecksum(t, dbPath)
// Restore to another path.
m, _, _, _ := newMain()
if err := m.Run(context.Background(), []string{"restore", "-config", configPath, "-o", restorePath, dbPath}); err != nil && !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
// Verify contents match.
if chksum1 := mustChecksum(t, restorePath); chksum0 != chksum1 {
t.Fatal("restore mismatch")
}
}
func mustCheckpoint(tb testing.TB, path string) {
tb.Helper()
db, err := sql.Open("sqlite3", path)
if err != nil {
tb.Fatal(err)
}
defer db.Close()
if _, err := db.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
tb.Fatal(err)
}
}
func mustChecksum(tb testing.TB, path string) uint64 {
tb.Helper()
f, err := os.Open(path)
if err != nil {
tb.Fatal(err)
}
defer f.Close()
h := crc64.New(crc64.MakeTable(crc64.ISO))
if _, err := io.Copy(h, f); err != nil {
tb.Fatal(err)
}
return h.Sum64()
}

View File

@@ -2,11 +2,12 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strconv"
"time"
@@ -14,24 +15,50 @@ import (
)
// RestoreCommand represents a command to restore a database from a backup.
type RestoreCommand struct{}
type RestoreCommand struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
snapshotIndex int // index of snapshot to start from
// CLI options
configPath string // path to config file
noExpandEnv bool // if true, do not expand env variables in config
outputPath string // path to restore database to
replicaName string // optional, name of replica to restore from
generation string // optional, generation to restore
targetIndex int // optional, last WAL index to replay
timestamp time.Time // optional, restore to point-in-time (ISO 8601)
ifDBNotExists bool // if true, skips restore if output path already exists
ifReplicaExists bool // if true, skips if no backups exist
opt litestream.RestoreOptions
}
// NewRestoreCommand returns a new instance of RestoreCommand.
func NewRestoreCommand(stdin io.Reader, stdout, stderr io.Writer) *RestoreCommand {
return &RestoreCommand{
stdin: stdin,
stdout: stdout,
stderr: stderr,
targetIndex: -1,
opt: litestream.NewRestoreOptions(),
}
}
// Run executes the command.
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt := litestream.NewRestoreOptions()
opt.Verbose = true
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
configPath, noExpandEnv := registerConfigFlag(fs)
fs.StringVar(&opt.OutputPath, "o", "", "output path")
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.StringVar(&opt.Generation, "generation", "", "generation name")
fs.Var((*indexVar)(&opt.Index), "index", "wal index")
fs.IntVar(&opt.Parallelism, "parallelism", opt.Parallelism, "parallelism")
ifDBNotExists := fs.Bool("if-db-not-exists", false, "")
ifReplicaExists := fs.Bool("if-replica-exists", false, "")
timestampStr := fs.String("timestamp", "", "timestamp")
verbose := fs.Bool("v", false, "verbose output")
registerConfigFlag(fs, &c.configPath, &c.noExpandEnv)
fs.StringVar(&c.outputPath, "o", "", "output path")
fs.StringVar(&c.replicaName, "replica", "", "replica name")
fs.StringVar(&c.generation, "generation", "", "generation name")
fs.Var((*indexVar)(&c.targetIndex), "index", "wal index")
timestampStr := fs.String("timestamp", "", "point-in-time restore (ISO 8601)")
fs.IntVar(&c.opt.Parallelism, "parallelism", c.opt.Parallelism, "parallelism")
fs.BoolVar(&c.ifDBNotExists, "if-db-not-exists", false, "")
fs.BoolVar(&c.ifReplicaExists, "if-replica-exists", false, "")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
@@ -40,87 +67,122 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
pathOrURL := fs.Arg(0)
// Parse timestamp, if specified.
// Parse timestamp.
if *timestampStr != "" {
if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
return errors.New("invalid -timestamp, must specify in ISO 8601 format (e.g. 2000-01-01T00:00:00Z)")
if c.timestamp, err = time.Parse(time.RFC3339Nano, *timestampStr); err != nil {
return fmt.Errorf("invalid -timestamp, expected ISO 8601: %w", err)
}
}
// Instantiate logger if verbose output is enabled.
if *verbose {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds)
// Ensure a generation is specified if target index is specified.
if c.targetIndex != -1 && !c.timestamp.IsZero() {
return fmt.Errorf("cannot specify both -index flag and -timestamp flag")
} else if c.targetIndex != -1 && c.generation == "" {
return fmt.Errorf("must specify -generation flag when using -index flag")
} else if !c.timestamp.IsZero() && c.generation == "" {
return fmt.Errorf("must specify -generation flag when using -timestamp flag")
}
// Determine replica & generation to restore from.
var r *litestream.Replica
if isURL(fs.Arg(0)) {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = c.loadFromURL(ctx, fs.Arg(0), *ifDBNotExists, &opt); err == errSkipDBExists {
fmt.Println("database already exists, skipping")
// Default to original database path if output path not specified.
if !isURL(pathOrURL) && c.outputPath == "" {
c.outputPath = pathOrURL
}
// Exit successfully if the output file already exists and flag is set.
if _, err := os.Stat(c.outputPath); os.IsNotExist(err) {
// file doesn't exist, continue
} else if err != nil {
return err
} else if err == nil {
if c.ifDBNotExists {
fmt.Fprintln(c.stdout, "database already exists, skipping")
return nil
}
return fmt.Errorf("output file already exists: %s", c.outputPath)
}
// Load configuration.
config, err := ReadConfigFile(c.configPath, !c.noExpandEnv)
if err != nil {
return err
}
// Build replica from either a URL or config.
r, err := c.loadReplica(ctx, config, pathOrURL)
if err != nil {
return err
}
// Determine latest generation if one is not specified.
if c.generation == "" {
if c.generation, err = litestream.FindLatestGeneration(ctx, r.Client()); err == litestream.ErrNoGeneration {
// Return an error if no matching targets found.
// If optional flag set, return success. Useful for automated recovery.
if c.ifReplicaExists {
fmt.Fprintln(c.stdout, "no matching backups found, skipping")
return nil
}
return fmt.Errorf("no matching backups found")
} else if err != nil {
return err
}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
if r, err = c.loadFromConfig(ctx, fs.Arg(0), *configPath, !*noExpandEnv, *ifDBNotExists, &opt); err == errSkipDBExists {
fmt.Println("database already exists, skipping")
return nil
} else if err != nil {
return err
return fmt.Errorf("cannot determine latest generation: %w", err)
}
}
// Return an error if no matching targets found.
// If optional flag set, return success. Useful for automated recovery.
if opt.Generation == "" {
if *ifReplicaExists {
fmt.Println("no matching backups found")
return nil
// Determine the maximum available index for the generation if one is not specified.
if !c.timestamp.IsZero() {
if c.targetIndex, err = litestream.FindIndexByTimestamp(ctx, r.Client(), c.generation, c.timestamp); err != nil {
return fmt.Errorf("cannot find index for timestamp in generation %q: %w", c.generation, err)
}
} else if c.targetIndex == -1 {
if c.targetIndex, err = litestream.FindMaxIndexByGeneration(ctx, r.Client(), c.generation); err != nil {
return fmt.Errorf("cannot determine latest index in generation %q: %w", c.generation, err)
}
return fmt.Errorf("no matching backups found")
}
return r.Restore(ctx, opt)
// Find lastest snapshot that occurs before the index.
// TODO: Optionally allow -snapshot-index
if c.snapshotIndex, err = litestream.FindSnapshotForIndex(ctx, r.Client(), c.generation, c.targetIndex); err != nil {
return fmt.Errorf("cannot find snapshot index: %w", err)
}
// Create parent directory if it doesn't already exist.
if err := os.MkdirAll(filepath.Dir(c.outputPath), 0700); err != nil {
return fmt.Errorf("cannot create parent directory: %w", err)
}
c.opt.Logger = log.New(c.stdout, "", log.LstdFlags|log.Lmicroseconds)
return litestream.Restore(ctx, r.Client(), c.outputPath, c.generation, c.snapshotIndex, c.targetIndex, c.opt)
}
// loadFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, ifDBNotExists bool, opt *litestream.RestoreOptions) (*litestream.Replica, error) {
if opt.OutputPath == "" {
return nil, fmt.Errorf("output path required")
func (c *RestoreCommand) loadReplica(ctx context.Context, config Config, arg string) (*litestream.Replica, error) {
if isURL(arg) {
return c.loadReplicaFromURL(ctx, config, arg)
}
return c.loadReplicaFromConfig(ctx, config, arg)
}
// Exit successfully if the output file already exists.
if _, err := os.Stat(opt.OutputPath); !os.IsNotExist(err) && ifDBNotExists {
return nil, errSkipDBExists
// loadReplicaFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadReplicaFromURL(ctx context.Context, config Config, replicaURL string) (*litestream.Replica, error) {
if c.replicaName != "" {
return nil, fmt.Errorf("cannot specify both the replica URL and the -replica flag")
} else if c.outputPath == "" {
return nil, fmt.Errorf("output path required when using a replica URL")
}
syncInterval := litestream.DefaultSyncInterval
r, err := NewReplicaFromConfig(&ReplicaConfig{
URL: replicaURL,
SyncInterval: &syncInterval,
return NewReplicaFromConfig(&ReplicaConfig{
URL: replicaURL,
AccessKeyID: config.AccessKeyID,
SecretAccessKey: config.SecretAccessKey,
SyncInterval: &syncInterval,
}, nil)
if err != nil {
return nil, err
}
opt.Generation, _, err = r.CalcRestoreTarget(ctx, *opt)
return r, err
}
// loadFromConfig returns a replica & updates the restore options from a DB reference.
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, expandEnv, ifDBNotExists bool, opt *litestream.RestoreOptions) (*litestream.Replica, error) {
// Load configuration.
config, err := ReadConfigFile(configPath, expandEnv)
if err != nil {
return nil, err
}
// loadReplicaFromConfig returns replicas based on the specific config path.
func (c *RestoreCommand) loadReplicaFromConfig(ctx context.Context, config Config, dbPath string) (_ *litestream.Replica, err error) {
// Lookup database from configuration file by path.
if dbPath, err = expand(dbPath); err != nil {
return nil, err
@@ -132,31 +194,40 @@ func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath
db, err := NewDBFromConfig(dbConfig)
if err != nil {
return nil, err
} else if len(db.Replicas) == 0 {
return nil, fmt.Errorf("database has no replicas: %s", dbPath)
}
// Restore into original database path if not specified.
if opt.OutputPath == "" {
opt.OutputPath = dbPath
// Filter by replica name if specified.
if c.replicaName != "" {
r := db.Replica(c.replicaName)
if r == nil {
return nil, fmt.Errorf("replica %q not found", c.replicaName)
}
return r, nil
}
// Exit successfully if the output file already exists.
if _, err := os.Stat(opt.OutputPath); !os.IsNotExist(err) && ifDBNotExists {
return nil, errSkipDBExists
// Choose only replica if only one available and no name is specified.
if len(db.Replicas) == 1 {
return db.Replicas[0], nil
}
// Determine the appropriate replica & generation to restore from,
r, generation, err := db.CalcRestoreTarget(ctx, *opt)
// A replica must be specified when restoring a specific generation with multiple replicas.
if c.generation != "" {
return nil, fmt.Errorf("must specify -replica flag when restoring from a specific generation")
}
// Determine latest replica to restore from.
r, err := litestream.LatestReplica(ctx, db.Replicas)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot determine latest replica: %w", err)
}
opt.Generation = generation
return r, nil
}
// Usage prints the help screen to STDOUT.
func (c *RestoreCommand) Usage() {
fmt.Printf(`
fmt.Fprintf(c.stdout, `
The restore command recovers a database from a previous snapshot and WAL.
Usage:
@@ -186,9 +257,9 @@ Arguments:
Restore up to a specific hex-encoded WAL index (inclusive).
Defaults to use the highest available index.
-timestamp TIMESTAMP
Restore to a specific point-in-time.
Defaults to use the latest available backup.
-timestamp DATETIME
Restore up to a specific point-in-time. Must be ISO 8601.
Cannot be specified with -index flag.
-o PATH
Output path of the restored database.
@@ -204,18 +275,12 @@ Arguments:
Determines the number of WAL files downloaded in parallel.
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
-v
Verbose output.
Examples:
# Restore latest replica for database to original location.
$ litestream restore /path/to/db
# Restore replica for database to a given point in time.
$ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db
# Restore latest replica for database to new /tmp directory
$ litestream restore -o /tmp/db /path/to/db
@@ -225,9 +290,10 @@ Examples:
# Restore database from specific generation on S3.
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
# Restore database to a specific point in time.
$ litestream restore -generation xxxxxxxx -timestamp 2000-01-01T00:00:00Z /path/to/db
`[1:],
DefaultConfigPath(),
)
}
var errSkipDBExists = errors.New("database already exists, skipping")

View File

@@ -0,0 +1,330 @@
package main_test
import (
"context"
"flag"
"os"
"path/filepath"
"strings"
"testing"
"github.com/benbjohnson/litestream/internal/testingutil"
)
func TestRestoreCommand(t *testing.T) {
t.Run("OK", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "ok")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, stdout, stderr := newMain()
if err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stderr.String(), ""; got != want {
t.Fatalf("stderr=%q, want %q", got, want)
}
// STDOUT has timing info so we need to grep per line.
lines := strings.Split(stdout.String(), "\n")
for i, substr := range []string{
`restoring snapshot 0000000000000000/0000000000000000 to ` + filepath.Join(tempDir, "db.tmp"),
`applied wal 0000000000000000/0000000000000000 elapsed=`,
`applied wal 0000000000000000/0000000000000001 elapsed=`,
`applied wal 0000000000000000/0000000000000002 elapsed=`,
`renaming database from temporary location`,
} {
if !strings.Contains(lines[i], substr) {
t.Fatalf("stdout: unexpected line %d:\n%s", i+1, stdout)
}
}
})
t.Run("ReplicaName", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "replica-name")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, stdout, stderr := newMain()
if err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), "-replica", "replica1", filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stderr.String(), ""; got != want {
t.Fatalf("stderr=%q, want %q", got, want)
}
// STDOUT has timing info so we need to grep per line.
lines := strings.Split(stdout.String(), "\n")
for i, substr := range []string{
`restoring snapshot 0000000000000001/0000000000000001 to ` + filepath.Join(tempDir, "db.tmp"),
`no wal files found, snapshot only`,
`renaming database from temporary location`,
} {
if !strings.Contains(lines[i], substr) {
t.Fatalf("stdout: unexpected line %d:\n%s", i+1, stdout)
}
}
})
t.Run("ReplicaURL", func(t *testing.T) {
testDir := filepath.Join(testingutil.Getwd(t), "testdata", "restore", "replica-url")
tempDir := t.TempDir()
replicaURL := "file://" + filepath.ToSlash(testDir) + "/replica"
m, _, stdout, stderr := newMain()
if err := m.Run(context.Background(), []string{"restore", "-o", filepath.Join(tempDir, "db"), replicaURL}); err != nil {
t.Fatal(err)
} else if got, want := stderr.String(), ""; got != want {
t.Fatalf("stderr=%q, want %q", got, want)
}
lines := strings.Split(stdout.String(), "\n")
for i, substr := range []string{
`restoring snapshot 0000000000000000/0000000000000000 to ` + filepath.Join(tempDir, "db.tmp"),
`no wal files found, snapshot only`,
`renaming database from temporary location`,
} {
if !strings.Contains(lines[i], substr) {
t.Fatalf("stdout: unexpected line %d:\n%s", i+1, stdout)
}
}
})
t.Run("LatestReplica", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "latest-replica")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, stdout, stderr := newMain()
if err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stderr.String(), ""; got != want {
t.Fatalf("stderr=%q, want %q", got, want)
}
lines := strings.Split(stdout.String(), "\n")
for i, substr := range []string{
`restoring snapshot 0000000000000001/0000000000000000 to ` + filepath.Join(tempDir, "db.tmp"),
`no wal files found, snapshot only`,
`renaming database from temporary location`,
} {
if !strings.Contains(lines[i], substr) {
t.Fatalf("stdout: unexpected line %d:\n%s", i+1, stdout)
}
}
})
t.Run("IfDBNotExistsFlag", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "if-db-not-exists-flag")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-if-db-not-exists", filepath.Join(testDir, "db")})
if err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("IfReplicaExists", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "if-replica-exists-flag")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-if-replica-exists", filepath.Join(testDir, "db")})
if err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ErrNoBackups", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "no-backups")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, stdout, stderr := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), filepath.Join(testDir, "db")})
if err == nil || err.Error() != `no matching backups found` {
t.Fatalf("unexpected error: %s", err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
} else if got, want := stderr.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stderr"))); got != want {
t.Fatalf("stderr=%q, want %q", got, want)
}
})
t.Run("ErrNoGeneration", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "no-generation")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), filepath.Join(testDir, "db")})
if err == nil || err.Error() != `no matching backups found` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrOutputPathExists", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "output-path-exists")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), filepath.Join(testDir, "db")})
if err == nil || err.Error() != `output file already exists: `+filepath.Join(testDir, "db") {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrDatabaseOrReplicaRequired", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore"})
if err == nil || err.Error() != `database path or replica URL required` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrTooManyArguments", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "abc", "123"})
if err == nil || err.Error() != `too many arguments` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidFlags", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-no-such-flag"})
if err == nil || err.Error() != `flag provided but not defined: -no-such-flag` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrIndexFlagOnly", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-index", "0", "/var/lib/db"})
if err == nil || err.Error() != `must specify -generation flag when using -index flag` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrConfigFileNotFound", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", "/no/such/file", "/var/lib/db"})
if err == nil || err.Error() != `config file not found: /no/such/file` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidConfig", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "invalid-config")
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "/var/lib/db"})
if err == nil || !strings.Contains(err.Error(), `replica path cannot be a url`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrMkdir", func(t *testing.T) {
tempDir := t.TempDir()
if err := os.Mkdir(filepath.Join(tempDir, "noperm"), 0000); err != nil {
t.Fatal(err)
}
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-o", filepath.Join(tempDir, "noperm", "subdir", "db"), "/var/lib/db"})
if err == nil || !strings.Contains(err.Error(), `permission denied`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrNoOutputPathWithReplicaURL", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "file://path/to/replica"})
if err == nil || err.Error() != `output path required when using a replica URL` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrReplicaNameWithReplicaURL", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-replica", "replica0", "file://path/to/replica"})
if err == nil || err.Error() != `cannot specify both the replica URL and the -replica flag` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidReplicaURL", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-o", filepath.Join(t.TempDir(), "db"), "xyz://xyz"})
if err == nil || err.Error() != `unknown replica type in config: "xyz"` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrDatabaseNotFound", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "database-not-found")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "/no/such/db"})
if err == nil || err.Error() != `database not found in config: /no/such/db` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrNoReplicas", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "no-replicas")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), filepath.Join(testDir, "db")})
if err == nil || err.Error() != `database has no replicas: `+filepath.Join(testingutil.Getwd(t), testDir, "db") {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrReplicaNotFound", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "replica-not-found")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), "-replica", "no_such_replica", filepath.Join(testDir, "db")})
if err == nil || err.Error() != `replica "no_such_replica" not found` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrGenerationWithNoReplicaName", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "generation-with-no-replica")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), "-generation", "0000000000000000", filepath.Join(testDir, "db")})
if err == nil || err.Error() != `must specify -replica flag when restoring from a specific generation` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrNoSnapshotsAvailable", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "no-snapshots")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
tempDir := t.TempDir()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-o", filepath.Join(tempDir, "db"), "-generation", "0000000000000000", filepath.Join(testDir, "db")})
if err == nil || err.Error() != `cannot determine latest index in generation "0000000000000000": no snapshots available` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("Usage", func(t *testing.T) {
m, _, _, _ := newMain()
if err := m.Run(context.Background(), []string{"restore", "-h"}); err != flag.ErrHelp {
t.Fatalf("unexpected error: %s", err)
}
})
}

View File

@@ -4,8 +4,9 @@ import (
"context"
"flag"
"fmt"
"io"
"log"
"os"
"sort"
"text/tabwriter"
"time"
@@ -13,95 +14,90 @@ import (
)
// SnapshotsCommand represents a command to list snapshots for a command.
type SnapshotsCommand struct{}
type SnapshotsCommand struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
configPath string
noExpandEnv bool
replicaName string
}
// NewSnapshotsCommand returns a new instance of SnapshotsCommand.
func NewSnapshotsCommand(stdin io.Reader, stdout, stderr io.Writer) *SnapshotsCommand {
return &SnapshotsCommand{
stdin: stdin,
stdout: stdout,
stderr: stderr,
}
}
// Run executes the command.
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (ret error) {
fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError)
configPath, noExpandEnv := registerConfigFlag(fs)
replicaName := fs.String("replica", "", "replica name")
registerConfigFlag(fs, &c.configPath, &c.noExpandEnv)
fs.StringVar(&c.replicaName, "replica", "", "replica name")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
var db *litestream.DB
var r *litestream.Replica
if isURL(fs.Arg(0)) {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
return err
}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
// Load configuration.
config, err := ReadConfigFile(c.configPath, !c.noExpandEnv)
if err != nil {
return err
}
// Load configuration.
config, err := ReadConfigFile(*configPath, !*noExpandEnv)
// Determine list of replicas to pull snapshots from.
replicas, _, err := loadReplicas(ctx, config, fs.Arg(0), c.replicaName)
if err != nil {
return err
}
// Build list of snapshot metadata with associated replica.
var infos []replicaSnapshotInfo
for _, r := range replicas {
a, err := r.Snapshots(ctx)
if err != nil {
return err
log.Printf("cannot determine snapshots: %s", err)
ret = errExit // signal error return without printing message
continue
}
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = NewDBFromConfig(dbc); err != nil {
return err
}
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
for i := range a {
infos = append(infos, replicaSnapshotInfo{SnapshotInfo: a[i], replicaName: r.Name()})
}
}
// Find snapshots by db or replica.
var replicas []*litestream.Replica
if r != nil {
replicas = []*litestream.Replica{r}
} else {
replicas = db.Replicas
}
// Sort snapshots by creation time from newest to oldest.
sort.Slice(infos, func(i, j int) bool { return infos[i].CreatedAt.After(infos[j].CreatedAt) })
// List all snapshots.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
w := tabwriter.NewWriter(c.stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated")
for _, r := range replicas {
infos, err := r.Snapshots(ctx)
if err != nil {
log.Printf("cannot determine snapshots: %s", err)
continue
}
for _, info := range infos {
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n",
r.Name(),
info.Generation,
info.Index,
info.Size,
info.CreatedAt.Format(time.RFC3339),
)
}
for _, info := range infos {
fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\n",
info.replicaName,
info.Generation,
litestream.FormatIndex(info.Index),
info.Size,
info.CreatedAt.Format(time.RFC3339),
)
}
return nil
return ret
}
// Usage prints the help screen to STDOUT.
func (c *SnapshotsCommand) Usage() {
fmt.Printf(`
fmt.Fprintf(c.stdout, `
The snapshots command lists all snapshots available for a database or replica.
Usage:
@@ -137,3 +133,9 @@ Examples:
DefaultConfigPath(),
)
}
// replicaSnapshotInfo represents snapshot metadata with associated replica name.
type replicaSnapshotInfo struct {
litestream.SnapshotInfo
replicaName string
}

View File

@@ -0,0 +1,128 @@
package main_test
import (
"context"
"flag"
"path/filepath"
"strings"
"testing"
"github.com/benbjohnson/litestream/internal/testingutil"
)
func TestSnapshotsCommand(t *testing.T) {
t.Run("OK", func(t *testing.T) {
testDir := filepath.Join("testdata", "snapshots", "ok")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"snapshots", "-config", filepath.Join(testDir, "litestream.yml"), filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ReplicaName", func(t *testing.T) {
testDir := filepath.Join("testdata", "snapshots", "replica-name")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"snapshots", "-config", filepath.Join(testDir, "litestream.yml"), "-replica", "replica1", filepath.Join(testDir, "db")}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ReplicaURL", func(t *testing.T) {
testDir := filepath.Join(testingutil.Getwd(t), "testdata", "snapshots", "replica-url")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
replicaURL := "file://" + filepath.ToSlash(testDir) + "/replica"
m, _, stdout, _ := newMain()
if err := m.Run(context.Background(), []string{"snapshots", replicaURL}); err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ErrDatabaseOrReplicaRequired", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots"})
if err == nil || err.Error() != `database path or replica URL required` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrTooManyArguments", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots", "abc", "123"})
if err == nil || err.Error() != `too many arguments` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidFlags", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots", "-no-such-flag"})
if err == nil || err.Error() != `flag provided but not defined: -no-such-flag` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrConfigFileNotFound", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots", "-config", "/no/such/file", "/var/lib/db"})
if err == nil || err.Error() != `config file not found: /no/such/file` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidConfig", func(t *testing.T) {
testDir := filepath.Join("testdata", "snapshots", "invalid-config")
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots", "-config", filepath.Join(testDir, "litestream.yml"), "/var/lib/db"})
if err == nil || !strings.Contains(err.Error(), `replica path cannot be a url`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrDatabaseNotFound", func(t *testing.T) {
testDir := filepath.Join("testdata", "snapshots", "database-not-found")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots", "-config", filepath.Join(testDir, "litestream.yml"), "/no/such/db"})
if err == nil || err.Error() != `database not found in config: /no/such/db` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrReplicaNotFound", func(t *testing.T) {
testDir := filepath.Join(testingutil.Getwd(t), "testdata", "snapshots", "replica-not-found")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots", "-config", filepath.Join(testDir, "litestream.yml"), "-replica", "no_such_replica", filepath.Join(testDir, "db")})
if err == nil || err.Error() != `replica "no_such_replica" not found for database "`+filepath.Join(testDir, "db")+`"` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrInvalidReplicaURL", func(t *testing.T) {
m, _, _, _ := newMain()
err := m.Run(context.Background(), []string{"snapshots", "xyz://xyz"})
if err == nil || !strings.Contains(err.Error(), `unknown replica type in config: "xyz"`) {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("Usage", func(t *testing.T) {
m, _, _, _ := newMain()
if err := m.Run(context.Background(), []string{"snapshots", "-h"}); err != flag.ErrHelp {
t.Fatalf("unexpected error: %s", err)
}
})
}

13
cmd/litestream/testdata/Makefile vendored Normal file
View File

@@ -0,0 +1,13 @@
.PHONY: default
default:
make -C generations/ok
make -C generations/no-database
make -C generations/replica-name
make -C generations/replica-url
make -C restore/latest-replica
make -C snapshots/ok
make -C snapshots/replica-name
make -C snapshots/replica-url
make -C wal/ok
make -C wal/replica-name
make -C wal/replica-url

View File

@@ -0,0 +1,4 @@
dbs:
- path: /var/lib/db
replicas:
- path: s3://bkt/db

View File

View File

@@ -0,0 +1 @@
dbs:

View File

@@ -0,0 +1 @@
No databases found in config file.

View File

@@ -0,0 +1,7 @@
dbs:
- path: /var/lib/db
replicas:
- path: /var/lib/replica
- url: s3://mybkt/db
- path: /my/other/db

View File

@@ -0,0 +1,3 @@
path replicas
/var/lib/db file,s3
/my/other/db

View File

@@ -0,0 +1,2 @@
dbs:
- path: $LITESTREAM_TESTDIR/db

View File

@@ -0,0 +1,4 @@
dbs:
- path: /var/lib/db
replicas:
- path: s3://bkt/db

View File

@@ -0,0 +1,4 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 replica/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001020000 replica/generations/0000000000000001/snapshots/0000000000000000.snapshot.lz4

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

@@ -0,0 +1,3 @@
name generation lag start end
file 0000000000000000 - 2000-01-01T00:00:00Z 2000-01-01T00:00:00Z
file 0000000000000001 - 2000-01-02T00:00:00Z 2000-01-02T00:00:00Z

View File

@@ -0,0 +1,9 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001030000 db
TZ=UTC touch -ct 200001010000 replica/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001020000 replica/generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
TZ=UTC touch -ct 200001010000 replica/generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001020000 replica/generations/0000000000000000/wal/0000000000000000/0000000000000001.wal.lz4
TZ=UTC touch -ct 200001030000 replica/generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001010000 replica/generations/0000000000000001/snapshots/0000000000000000.snapshot.lz4

View File

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

View File

@@ -0,0 +1,3 @@
name generation lag start end
file 0000000000000000 0s 2000-01-01T00:00:00Z 2000-01-03T00:00:00Z
file 0000000000000001 48h0m0s 2000-01-01T00:00:00Z 2000-01-01T00:00:00Z

View File

@@ -0,0 +1,5 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001030000 db
TZ=UTC touch -ct 200001010000 replica0/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001020000 replica1/generations/0000000000000001/snapshots/0000000000000000.snapshot.lz4

View File

View File

@@ -0,0 +1,7 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- name: replica0
path: $LITESTREAM_TESTDIR/replica0
- name: replica1
path: $LITESTREAM_TESTDIR/replica1

View File

@@ -0,0 +1,2 @@
name generation lag start end
replica1 0000000000000001 24h0m0s 2000-01-02T00:00:00Z 2000-01-02T00:00:00Z

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- url: s3://bkt/db

View File

@@ -0,0 +1,9 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 replica/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001020000 replica/generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
TZ=UTC touch -ct 200001010000 replica/generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001020000 replica/generations/0000000000000000/wal/0000000000000000/0000000000000001.wal.lz4
TZ=UTC touch -ct 200001030000 replica/generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001020000 replica/generations/0000000000000001/snapshots/0000000000000000.snapshot.lz4

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

@@ -0,0 +1,3 @@
name generation lag start end
file 0000000000000000 - 2000-01-01T00:00:00Z 2000-01-03T00:00:00Z
file 0000000000000001 - 2000-01-02T00:00:00Z 2000-01-02T00:00:00Z

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

@@ -0,0 +1,5 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica0
- path: $LITESTREAM_TESTDIR/replica1

Binary file not shown.

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

@@ -0,0 +1 @@
database already exists, skipping

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

@@ -0,0 +1 @@
no matching backups found, skipping

View File

@@ -0,0 +1,4 @@
dbs:
- path: /var/lib/db
replicas:
- path: s3://bkt/db

View File

@@ -0,0 +1,6 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 replica0/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001020000 replica1/generations/0000000000000002/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001030000 replica0/generations/0000000000000001/snapshots/0000000000000000.snapshot.lz4

View File

@@ -0,0 +1,7 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- name: replica0
path: $LITESTREAM_TESTDIR/replica0
- name: replica1
path: $LITESTREAM_TESTDIR/replica1

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

View File

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

View File

@@ -0,0 +1,2 @@
dbs:
- path: $LITESTREAM_TESTDIR/db

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

Binary file not shown.

View File

@@ -0,0 +1,36 @@
To reproduce this testdata, run sqlite3 and execute:
PRAGMA journal_mode = WAL;
CREATE TABLE t (x);
INSERT INTO t (x) VALUES (1);
INSERT INTO t (x) VALUES (2);
sl3 split -o generations/0000000000000000/wal/0000000000000000 db-wal
cp db generations/0000000000000000/snapshots/0000000000000000.snapshot
lz4 -c --rm generations/0000000000000000/snapshots/0000000000000000.snapshot
Then execute:
PRAGMA wal_checkpoint(TRUNCATE);
INSERT INTO t (x) VALUES (3);
sl3 split -o generations/0000000000000000/wal/0000000000000001 db-wal
Then execute:
PRAGMA wal_checkpoint(TRUNCATE);
INSERT INTO t (x) VALUES (4);
INSERT INTO t (x) VALUES (5);
sl3 split -o generations/0000000000000000/wal/0000000000000002 db-wal
Finally, obtain the final snapshot:
PRAGMA wal_checkpoint(TRUNCATE);
cp db 0000000000000002.db
rm db*

View File

@@ -0,0 +1,4 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica

Some files were not shown because too many files have changed in this diff Show More