Compare commits

...

147 Commits

Author SHA1 Message Date
Ben Johnson
16c50d1d2e Merge pull request #120 from benbjohnson/default-force-path-style
Default to force path style if endpoint set
2021-03-11 15:29:18 -07:00
Ben Johnson
929a66314c Default to force path style if endpoint set
This commit changes the replica configuration behavior to default
the `force-path-style` field to `true` when an `endpoint` is set.
This works because the only service that does not use the path
style is AWS S3 which does not use an endpoint.
2021-03-11 15:26:41 -07:00
Ben Johnson
2e7a6ae715 Merge pull request #118 from benbjohnson/default-region 2021-03-09 15:42:04 -07:00
Ben Johnson
896aef070c Default region if endpoint specified 2021-03-09 15:38:32 -07:00
Ben Johnson
3598d8b572 Merge pull request #111 from benbjohnson/linode
Add support for Linode Object Storage replica URLs
2021-03-07 08:55:29 -07:00
Ben Johnson
3183cf0e2e Add support for Linode Object Storage replica URLs
This commit adds the ability to specify Linode Object Storage
as replica URLs in the command line and configuration file:

	s3://MYBKT.us-east-1.linodeobjects.com/MYPATH
2021-03-07 08:47:24 -07:00
Ben Johnson
a59ee6ed63 Merge pull request #110 from benbjohnson/digitalocean
Add support for DigitalOcean Spaces replica URLs
2021-03-07 08:29:23 -07:00
Ben Johnson
e4c1a82eb2 Add support for DigitalOcean Spaces replica URLs
This commit adds the ability to specify DigitalOcean Spaces as
replica URLs in the command line and configuration file:

	s3://mybkt.nyc3.digitaloceanspaces.com/mypath
2021-03-07 08:25:26 -07:00
Ben Johnson
aa54e4698d Merge pull request #109 from benbjohnson/wal-mismatch-validation-info
Add WAL validation debug information
2021-03-07 07:55:02 -07:00
Ben Johnson
43e40ce8d3 Merge pull request #108 from benbjohnson/revert-lock-removal
Revert sync lock removal
2021-03-07 07:52:49 -07:00
Ben Johnson
0bd1b13b94 Add wal validation debug information on error
This commit adds the WAL header and shadow path to "wal header mismatch"
errors to help debug issues. The mismatch seems to happen more often
than I would expect on restart. This error doesn't cause any corruption;
it simply causes a generation to restart which requires a snapshot.
2021-03-07 07:48:43 -07:00
Ben Johnson
1c16aae550 Revert sync lock removal
This commit reverts the removal of the SQLite write lock during
WAL sync (998e831c5c). The change
caused validation mismatch errors during the long-running test
although the restored database did not appear to be corrupted so
perhaps it's simply a locking issue during validation.
2021-03-07 07:30:25 -07:00
Ben Johnson
49f47ea87f Merge pull request #105 from benbjohnson/db-config-fields
Expose additional DB configuration settings
2021-03-06 08:37:02 -07:00
Ben Johnson
8947adc312 Expose additional DB configuration settings
This commit exposes the monitor interval, checkpoint interval,
minimum checkpoint page count, and maximum checkpoint page count
via the YAML configuration file.
2021-03-06 08:33:19 -07:00
Ben Johnson
9341863bdb Merge pull request #104 from benbjohnson/remove-sync-lock
Remove SQLite write lock during WAL sync
2021-03-06 08:08:23 -07:00
Ben Johnson
998e831c5c Remove SQLite write lock during WAL sync
Originally, Litestream relied on a SQLite write lock to ensure
transactions were atomically replicated. However, this was changed
so that Litestream itself now validates the transaction boundaries.
As such, the write lock on the database is no longer needed. The
read lock is sufficient to prevent WAL rollover and the WAL is
append only so it is safe to read up to a known position calculated
via fstat().

WAL validation change was made in 031a526b9a

The locking code, however, was moved in this commit to the
post-checkpoint copy to ensure the end-of-file is not overwritten
by an aggressive writers.
2021-03-06 07:51:35 -07:00
Ben Johnson
b2ca113fb5 Merge pull request #103 from benbjohnson/fix-addr-log
Fix logged hostport for metrics endpoint
2021-03-06 07:30:36 -07:00
Ben Johnson
b211e82ed2 Fix logged hostport for metrics endpoint
This commit fixes a bug where the bind address is not reported
correctly in the log if a hostname is specified. Previously it
would always report the host as "localhost" even if a host was
specified (such as "0.0.0.0:9090").

This commit also adds validation to require the port to be
specified and only specifying a hostname will return an error.
2021-03-06 07:23:09 -07:00
Ben Johnson
e2779169a0 README 2021-03-02 08:11:51 -07:00
Ben Johnson
ec2f9c84d5 Merge pull request #96 from benbjohnson/acknowledgements
Acknowledgments
2021-02-28 09:19:44 -07:00
Ben Johnson
78eb8dcc53 Acknowledgments 2021-02-28 09:16:35 -07:00
Ben Johnson
cafa0f5942 Merge pull request #94 from benbjohnson/prevent-config-and-replica-url
Prevent user from specifying replica URL & config flag
2021-02-28 08:31:29 -07:00
Ben Johnson
325482a97c Prevent user from specifying replica URL & config flag
Previously, if a replica URL was specified then the `-config` flag
was silently ignored. This commit changes this behavior so that
specifying both the URL & config flag will now return an error.
2021-02-28 08:09:24 -07:00
Ben Johnson
9cee1285b9 Merge pull request #93 from benbjohnson/non-ofd-locks
Fix release of non-OFD locks
2021-02-28 07:28:00 -07:00
Ben Johnson
a14a74d678 Fix release of non-OFD locks
This commit removes short-lived `os.Open()` calls on the database
file because this can cause locks to be released when `os.File.Close()`
is later called if the operating system does not support OFD
(Open File Descriptor) locks.
2021-02-28 06:44:02 -07:00
Ben Johnson
f652186adf Merge pull request #84 from benbjohnson/snapshot-interval 2021-02-25 15:41:58 -07:00
Ben Johnson
afb8731ead Add snapshot interval
This commit adds the ability to periodically perform snapshots on
an interval that is separate from retention. For example, this lets
you retain backups for 24 hours but you can snapshot your database
every six hours to improve recovery time.
2021-02-25 15:34:13 -07:00
Ben Johnson
ce2d54cc20 Merge pull request #82 from benbjohnson/fix-db-init-failure
Fix error handling when DB.init() fails
2021-02-24 15:47:58 -07:00
Ben Johnson
d802e15b4f Fix error handling when DB.init() fails
The `DB.init()` can fail temporarily for a variety of reasons such
as the database being locked. Previously, the DB would save the
`*sql.DB` connection even if a step failed and this prevented the
database from attempting initialization again. This change makes it
so that the connection is only saved if initialization is successful.
On failure, the initialization process will be retried on next sync.
2021-02-24 15:43:28 -07:00
Ben Johnson
d6ece0b826 Merge pull request #73 from benbjohnson/fix-example-yml
Fix example litestream.yml replica configuration
2021-02-22 07:54:31 -07:00
Ben Johnson
cb007762be Fix example litestream.yml replica configuration 2021-02-22 07:52:56 -07:00
Ben Johnson
6a90714bbe Merge pull request #70 from benbjohnson/fix-global-settings
Fix global settings propagation
2021-02-22 06:40:16 -07:00
Ben Johnson
622ba82ebb Fix global settings propagation
This commit fixes an issue caused by a refactor where setting global
or local AWS credentials in a config file fails.
2021-02-22 06:37:40 -07:00
Ben Johnson
6ca010e9db Merge pull request #66 from benbjohnson/s3-compatible 2021-02-21 10:00:43 -07:00
Ben Johnson
ad9ce43127 Add support for S3-compatible object storage.
This commits adds support for non-AWS S3-compatible storage such as
MinIO, Backblaze B2, & Google Cloud Storage (GCS). Other backends
should also work but some code has been added to make URL-based
configurations work more easily.
2021-02-21 09:40:48 -07:00
Ben Johnson
167d333fcd Merge pull request #65 from benbjohnson/windows
Add Windows Service & MSI builds
2021-02-19 16:25:10 -07:00
Ben Johnson
c5390dec1d Add Windows Service & MSI builds 2021-02-19 16:21:04 -07:00
Ben Johnson
e2cbd5fb63 README 2021-02-12 08:14:55 -07:00
Ben Johnson
8d083f7a2d README 2021-02-09 07:07:23 -07:00
Ben Johnson
37442babfb Revert validation mismatch temp file persistence
This commit reverts 4e469f8 which was used for debugging the validation
stall corruption issue. It can cause the disk to fill with temporary
files though so it is being reverted.
2021-02-09 06:44:42 -07:00
Ben Johnson
962a2a894b Fix tabwriter 2021-02-08 15:55:15 -07:00
Ben Johnson
0c61c9f7fe Merge pull request #39 from benbjohnson/replicate-s3-sync-interval
Reduce s3 sync interval when using replica URL
2021-02-08 14:14:20 -07:00
Ben Johnson
267b140fab Reduce s3 sync interval when using replica URL
This commit changes the default sync interval from 10s to 1s
when replicating using the inline replica URL. This approach is
used when users are first testing the software so a faster
replication interval makes it easier to see results.
2021-02-08 14:09:01 -07:00
Ben Johnson
1b194535e6 Merge pull request #38 from benbjohnson/trace-flag
Add trace file to replicate command
2021-02-06 07:34:02 -07:00
Ben Johnson
58a6c765fe Add trace file to replicate command
This commit removes the verbose flag (`-v`) and replaces it with
the trace flag (`-trace PATH`). This moves tracing to a separate
file instead of writing to STDOUT.
2021-02-06 07:31:19 -07:00
Ben Johnson
2604052a9f Merge pull request #37 from benbjohnson/fix-shadow-write
Fix shadow wal corruption on stalled validation
2021-02-06 07:31:05 -07:00
Ben Johnson
7f81890bae Fix shadow wal corruption on stalled validation
This commit fixes a timing bug that occurs in a specific scenario
where the shadow wal sync stalls because of an s3 validation and
the catch up write to the shadow wal is large enough to allow a
window between WAL reads and the final copy.

The file copy has been replaced by direct writes of the frame
buffer to the shadow to ensure that every validated byte is exactly
what is being written to the shadow wal. The one downside to this
change is that the frame buffer will grow with the transaction
size so it will use additional heap. This can be replaced by a
spill-to-disk implementation but this should work well in the
short term.
2021-02-06 07:28:15 -07:00
Ben Johnson
2ff073c735 Merge pull request #36 from benbjohnson/max-index
Enforce max WAL index
2021-02-02 15:16:42 -07:00
Ben Johnson
6fd11ccab5 Enforce max WAL index.
This commit sets a hard upper limit for the WAL index to (1<<31)-1.
The index is hex-encoded in file names as a 4-byte unsigned integer
so limit ensures all index values are below any upper limit and are
unaffected by any signed int limit.

A WAL file is typically at least 4MB so you would need to write
8 petabytes to reach this upper limit.
2021-02-02 15:11:50 -07:00
Ben Johnson
6c49fba592 Check checkpoint result during restore 2021-02-02 15:04:20 -07:00
Ben Johnson
922fa0798e Merge pull request #34 from benbjohnson/windows 2021-01-31 10:09:51 -07:00
Ben Johnson
976df182c0 Fix Windows build 2021-01-31 10:08:28 -07:00
Ben Johnson
0e28a650e6 Merge pull request #33 from benbjohnson/log-wal-checksum-mismatch
Log WAL frame checksum mismatch
2021-01-31 08:57:51 -07:00
Ben Johnson
f17768e830 Log WAL frame checksum mismatch
Currently, the WAL copy function can encounter a checksum mismatch in a
WAL frame and it will return an error. This can occur for partial writes
and is recovered from moments later. This commit changes the error to a
log write instead.
2021-01-31 08:52:12 -07:00
Ben Johnson
2c142d3a0c Merge pull request #32 from benbjohnson/persist-mismatch-validation-data
Persist primary/replica copies after validation mismatch
2021-01-31 08:50:15 -07:00
Ben Johnson
4e469f8b02 Persist primary/replica copies after validation mismatch
This commit changes `ValidateReplica()` to persist copies of the
primary & replica databases for inspection if a validation mismatch
occurs.
2021-01-31 08:47:06 -07:00
Ben Johnson
3f268b70f8 Merge pull request #31 from benbjohnson/adjust-logging
Reduce logging output
2021-01-31 08:14:57 -07:00
Ben Johnson
ad7bf7f974 Reduce logging output
Previously, there were excessive log messages for checkpoints and
retention. These have been removed or combined into a single log
message where appropriate.
2021-01-31 08:12:18 -07:00
Ben Johnson
778451f09f CONTRIBUTING 2021-01-28 13:31:28 -07:00
Ben Johnson
8e9a15933b README 2021-01-27 08:01:48 -07:00
Ben Johnson
da1d7c3183 README 2021-01-27 07:59:12 -07:00
Ben Johnson
a178ef4714 Merge pull request #27 from benbjohnson/generations-replica-url
Allow replica URLs for generations command
2021-01-27 07:50:29 -07:00
Ben Johnson
7ca2e193b9 Allow replica URLs for generations command 2021-01-27 07:48:56 -07:00
Ben Johnson
39a6fabb9f Fix restore logging. 2021-01-26 17:01:00 -07:00
Ben Johnson
0249b4e4f5 Merge pull request #25 from benbjohnson/replica-url 2021-01-26 16:40:17 -07:00
Ben Johnson
67eeb49101 Allow replica URL to be used for commands
This commit refactors the commands to allow a replica URL when
restoring a database. If the first CLI arg is a URL with a scheme,
the it is treated as a replica URL.
2021-01-26 16:33:16 -07:00
Ben Johnson
f7213ed35c Allow replication without config file.
This commit changes `litestream replicate` to accept a database
path and a replica URL instead of using the config file. This allows
people to quickly try out the tool instead of learning the config
file syntax.
2021-01-25 10:33:50 -07:00
Ben Johnson
a532a0198e README 2021-01-24 10:09:54 -07:00
Ben Johnson
16f79e5814 Merge pull request #24 from benbjohnson/document-retention-period
Document retention period configuration
2021-01-24 09:32:31 -07:00
Ben Johnson
39aefc2c02 Document retention period configuration 2021-01-24 09:28:57 -07:00
Ben Johnson
0b08669bca Merge pull request #23 from benbjohnson/disable-metrics-by-default
Disable prometheus metrics by default
2021-01-24 09:18:37 -07:00
Ben Johnson
8f5761ee13 Disable prometheus metrics by default
The HTTP server should only be enabled if a user explicitly sets a
port for it.
2021-01-24 09:16:23 -07:00
Ben Johnson
d2eb4fa5ba Remove PR action 2021-01-24 08:54:29 -07:00
Ben Johnson
ca489c5e73 Merge pull request #22 from benbjohnson/notorize
Add signed homebrew install
2021-01-24 08:50:01 -07:00
Ben Johnson
f0ae48af4c Add signed homebrew install 2021-01-24 08:47:16 -07:00
Ben Johnson
9eae39e2fa README 2021-01-21 15:01:30 -07:00
Ben Johnson
42ab293ffb README 2021-01-21 14:53:21 -07:00
Ben Johnson
c8b72bf16b Fix release action 2021-01-21 14:39:23 -07:00
Ben Johnson
9c4de6c520 Debian release action 2021-01-21 14:34:42 -07:00
Ben Johnson
94411923a7 Fix unit test 2021-01-21 13:52:35 -07:00
Ben Johnson
e92db9ef4b Enforce stricter validation on restart.
Previously, the sync would validate the last page written to ensure
that replication picked up from the last position. However, a large
WAL file followed by a series of shorter checkpointed WAL files means
that the last page could be the same even if multiple checkpoints
have occurred.

To fix this, the WAL header must match the shadow WAL header when
starting litestream since there are no guarantees about checkpoints.
2021-01-21 13:44:05 -07:00
Ben Johnson
031a526b9a Only copy committed WAL pages 2021-01-21 12:44:11 -07:00
Ben Johnson
2244be885d README 2021-01-20 17:05:47 -07:00
Ben Johnson
95bcaa5927 Fix file replica compression 2021-01-19 09:25:38 -07:00
Ben Johnson
1935ebd6f0 Fix S3 GET bytes metric 2021-01-19 06:46:13 -07:00
Ben Johnson
7fb98df240 cleanup 2021-01-18 15:58:49 -07:00
Ben Johnson
f31c22af62 Remove s3 bucket lookup log 2021-01-18 15:27:16 -07:00
Ben Johnson
139d836d7a Fix file/dir mode 2021-01-18 15:23:28 -07:00
Ben Johnson
14dad1fd5a Switch from gzip to lz4 2021-01-18 14:45:12 -07:00
Ben Johnson
35d755e7f2 Remove debugging code 2021-01-18 10:33:30 -07:00
Ben Johnson
358dcd4650 Copy shadow WAL immediately after init 2021-01-18 10:01:16 -07:00
Ben Johnson
2ce4052300 Remove write lock during db checksum 2021-01-18 07:05:27 -07:00
Ben Johnson
44af75fa98 Fix missing WAL reader error 2021-01-18 07:05:17 -07:00
Ben Johnson
3c4fd152c9 Add more checksum logging 2021-01-18 06:38:03 -07:00
Ben Johnson
d259d9b9e3 Fix checksum logging 2021-01-17 10:19:39 -07:00
Ben Johnson
90a1d959d4 Remove size from s3 filenames 2021-01-17 10:02:06 -07:00
Ben Johnson
04d75507e3 Fix checksum hex padding 2021-01-17 09:52:09 -07:00
Ben Johnson
4b65e6a88f Log validation position 2021-01-17 07:38:13 -07:00
Ben Johnson
07a65cbac7 Fix crc64 unit test 2021-01-16 10:04:03 -07:00
Ben Johnson
6ac6a8536d Obtain write lock during validation. 2021-01-16 09:27:43 -07:00
Ben Johnson
71ab15e50a Fix S3 GET stats 2021-01-16 09:22:33 -07:00
Ben Johnson
b4e5079760 Add .deb packaging 2021-01-16 09:22:02 -07:00
Ben Johnson
78563f821d Do not require databases when starting replication 2021-01-16 09:15:16 -07:00
Ben Johnson
e65536f81d Stop waiting for replica if generation changes 2021-01-16 07:47:02 -07:00
Ben Johnson
25fec29e1a Clear last position on replica sync error 2021-01-16 07:45:08 -07:00
Ben Johnson
cbc2dce6dc Add busy timeout 2021-01-16 07:33:32 -07:00
Ben Johnson
1b8cfc8a41 Add validation interval 2021-01-15 16:37:04 -07:00
Ben Johnson
290e06e60d Reduce s3 LIST operations 2021-01-15 13:31:04 -07:00
Ben Johnson
b94ee366e5 Fix snapshot only restore 2021-01-15 13:12:15 -07:00
Ben Johnson
743aeb83e1 Revert gzip compression level, fix s3 wal upload 2021-01-15 13:04:21 -07:00
Ben Johnson
a7ec05ad7a Allow global AWS settings in config. 2021-01-15 12:27:41 -07:00
Ben Johnson
28dd7b564e Lookup s3 bucket region if not specified 2021-01-15 12:18:07 -07:00
Ben Johnson
43dda4315f Allow URLs for replica config path 2021-01-15 12:04:23 -07:00
Ben Johnson
0655bf420a Fix unit tests 2021-01-14 16:13:19 -07:00
Ben Johnson
8c113cf260 Add file & s3 replica metrics 2021-01-14 16:10:02 -07:00
Ben Johnson
daa74f87b4 Add file replica metrics 2021-01-14 15:47:58 -07:00
Ben Johnson
e1c9e09161 Update wal segment naming 2021-01-14 15:26:29 -07:00
Ben Johnson
1e4e9633cc Add s3 sync interval 2021-01-14 15:04:26 -07:00
Ben Johnson
294846cce2 Add context to s3 2021-01-13 16:38:00 -07:00
Ben Johnson
9eb7bd41c2 S3 reader & retention enforcement 2021-01-13 16:21:42 -07:00
Ben Johnson
1ac4adb272 Basic s3 replication working 2021-01-13 14:23:41 -07:00
Ben Johnson
a42f83f3cb Add LITESTREAM_CONFIG env var 2021-01-13 13:17:38 -07:00
Ben Johnson
57a02a8628 S3 replica 2021-01-13 10:14:54 -07:00
Ben Johnson
faa5765745 Add retention policy, remove WAL subdir 2021-01-12 15:22:37 -07:00
Ben Johnson
1fa1313b0b Add trace logging. 2021-01-11 11:04:29 -07:00
Ben Johnson
bcdb553267 Use database owner/group 2021-01-11 09:39:08 -07:00
Ben Johnson
9828b4c1dd Rename 'databases' to 'dbs' in config 2021-01-10 10:07:07 -07:00
Ben Johnson
dde9d1042d Update generation lag calc 2021-01-10 09:54:05 -07:00
Ben Johnson
8f30ff7d93 Fix negative duration truncation. 2021-01-10 09:52:04 -07:00
Ben Johnson
aa136a17ee Fix duration truncation. 2021-01-10 09:46:39 -07:00
Ben Johnson
60cb2c97ca Set default max checkpoint. 2021-01-10 09:46:27 -07:00
Ben Johnson
0abe09526d Fix local build 2021-01-09 08:59:08 -07:00
Ben Johnson
b0a3440356 make dist 2021-01-08 16:05:07 -07:00
Ben Johnson
a8d63b54aa README 2021-01-08 10:07:26 -07:00
Ben Johnson
b22f3f100d Add FileReplica.Sync() unit tests. 2021-01-05 15:48:50 -07:00
Ben Johnson
3075b2e92b Fix WAL mod time test 2021-01-05 15:15:28 -07:00
Ben Johnson
7c3272c96f Revert "Test 1.16beta1"
This reverts commit 4294fcf4b4.
2021-01-05 15:11:29 -07:00
Ben Johnson
4294fcf4b4 Test 1.16beta1 2021-01-05 15:10:09 -07:00
Ben Johnson
ae0f51eaa9 Fix setup-go 2021-01-05 14:34:14 -07:00
Ben Johnson
8871d75a8e Fix max checkpoint size check 2021-01-05 14:17:13 -07:00
Ben Johnson
c22eea13ad Add checkpoint tests 2021-01-05 14:07:17 -07:00
Ben Johnson
f4d055916a Add DB sync tests 2021-01-05 13:59:16 -07:00
Ben Johnson
979cabcdb9 Add some DB.Sync() tests 2021-01-01 10:02:03 -07:00
Ben Johnson
5134bc3328 Add test coverage for DB.CRC64 2021-01-01 09:26:23 -07:00
Ben Johnson
78d9de6512 Add DB path tests 2021-01-01 09:00:23 -07:00
Ben Johnson
065f641526 Change validation to use CRC-64 2021-01-01 08:24:11 -07:00
Ben Johnson
f4d0d87fa7 Add DB.UpdatedAt() tests 2021-01-01 08:20:40 -07:00
39 changed files with 4616 additions and 1173 deletions

17
.github/CONTRIBUTING.md vendored Normal file
View File

@@ -0,0 +1,17 @@
## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
source but closed to contributions. This keeps the code base free of proprietary
or licensed code but it also helps me continue to maintain and build Litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and
I eventually archived the project. Writing databases & low-level replication
tools involves nuance and simple one line changes can have profound and
unexpected changes in correctness and performance. Small contributions
typically required hours of my time to properly test and validate them.
I am grateful for community involvement, bug reports, & feature requests. I do
not wish to come off as anything but welcoming, however, I've
made the decision to keep this project closed to contributions for my own
mental health and long term viability of the project.

View File

@@ -5,7 +5,7 @@ on:
name: release
jobs:
release:
linux:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
@@ -16,10 +16,21 @@ jobs:
env:
GITHUB_TOKEN: ${{ github.token }}
- name: Install nfpm
run: |
wget https://github.com/goreleaser/nfpm/releases/download/v2.2.3/nfpm_2.2.3_Linux_x86_64.tar.gz
tar zxvf nfpm_2.2.3_Linux_x86_64.tar.gz
- name: Build litestream
run: |
go build -ldflags "-X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o litestream ./cmd/litestream
mkdir -p dist
cp etc/litestream.yml etc/litestream.service dist
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml
go build -ldflags "-X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o dist/litestream ./cmd/litestream
cd dist
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz litestream
../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
- name: Upload release binary
uses: actions/upload-release-asset@v1.0.2
@@ -27,6 +38,16 @@ jobs:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_name: litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.tar.gz
asset_content_type: application/gzip
- name: Upload debian package
uses: actions/upload-release-asset@v1.0.2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
asset_name: litestream-${{ steps.release.outputs.tag_name }}-linux-amd64.deb
asset_content_type: application/octet-stream

View File

@@ -1,10 +1,12 @@
on: [push, pull_request]
on: push
name: test
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
- uses: actions/checkout@v2

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.DS_Store
/dist

22
Makefile Normal file
View File

@@ -0,0 +1,22 @@
default:
dist-linux:
mkdir -p dist
cp etc/litestream.yml dist/litestream.yml
docker run --rm -v "${PWD}":/usr/src/litestream -w /usr/src/litestream -e GOOS=linux -e GOARCH=amd64 golang:1.15 go build -v -o dist/litestream ./cmd/litestream
tar -cz -f dist/litestream-linux-amd64.tar.gz -C dist litestream
dist-macos:
ifndef LITESTREAM_VERSION
$(error LITESTREAM_VERSION is undefined)
endif
mkdir -p dist
go build -v -ldflags "-X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
gon etc/gon.hcl
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
clean:
rm -rf dist
.PHONY: default dist-linux dist-macos clean

View File

@@ -1,17 +1,59 @@
litestream
Litestream
![GitHub release (latest by date)](https://img.shields.io/github/v/release/benbjohnson/litestream)
![Status](https://img.shields.io/badge/status-beta-blue)
![GitHub](https://img.shields.io/github/license/benbjohnson/litestream)
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
==========
Streaming replication for SQLite.
Litestream is a standalone streaming replication tool for SQLite. It runs as a
background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database.
If you need support or have ideas for improving Litestream, please join the
[Litestream Slack][slack] or visit the [GitHub Discussions](https://github.com/benbjohnson/litestream/discussions).
Please visit the [Litestream web site](https://litestream.io) for installation
instructions and documentation.
If you find this project interesting, please consider starring the project on
GitHub.
[slack]: https://join.slack.com/t/litestream/shared_invite/zt-n0j4s3ci-lx1JziR3bV6L2NMF723H3Q
## Questions
## Acknowledgements
- How to avoid WAL checkpointing on close?
While the Litestream project does not accept external code patches, many
of the most valuable contributions are in the forms of testing, feedback, and
documentation. These help harden software and streamline usage for other users.
I want to give special thanks to individuals who invest much of their time and
energy into the project to help make it better. Shout out to [Michael
Lynch](https://github.com/mtlynch) for digging into issues and contributing to
the documentation.
## Notes
## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
source but closed to code contributions. This keeps the code base free of
proprietary or licensed code but it also helps me continue to maintain and build
Litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and
I eventually archived the project. Writing databases & low-level replication
tools involves nuance and simple one line changes can have profound and
unexpected changes in correctness and performance. Small contributions
typically required hours of my time to properly test and validate them.
I am grateful for community involvement, bug reports, & feature requests. I do
not wish to come off as anything but welcoming, however, I've
made the decision to keep this project closed to contributions for my own
mental health and long term viability of the project.
The [documentation repository][docs] is MIT licensed and pull requests are welcome there.
[releases]: https://github.com/benbjohnson/litestream/releases
[docs]: https://github.com/benbjohnson/litestream.io
```sql
-- Disable autocheckpointing.
PRAGMA wal_autocheckpoint = 0
```

View File

@@ -2,7 +2,6 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
@@ -10,33 +9,36 @@ import (
"text/tabwriter"
)
// DatabasesCommand is a command for listing managed databases.
type DatabasesCommand struct{}
// Run executes the command.
func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-databases", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
configPath := registerConfigFlag(fs)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() != 0 {
return fmt.Errorf("too many argument")
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
if *configPath == "" {
*configPath = DefaultConfigPath()
}
config, err := ReadConfigFile(configPath)
config, err := ReadConfigFile(*configPath)
if err != nil {
return err
}
// List all databases.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "path\treplicas")
for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(dbConfig)
db, err := NewDBFromConfig(dbConfig)
if err != nil {
return err
}
@@ -51,11 +53,11 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
strings.Join(replicaNames, ","),
)
}
w.Flush()
return nil
}
// Usage prints the help screen to STDOUT.
func (c *DatabasesCommand) Usage() {
fmt.Printf(`
The databases command lists all databases in the configuration file.
@@ -71,6 +73,6 @@ Arguments:
Defaults to %s
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -2,71 +2,89 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// GenerationsCommand represents a command to list all generations for a database.
type GenerationsCommand struct{}
// Run executes the command.
func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
configPath := registerConfigFlag(fs)
replicaName := fs.String("replica", "", "replica name")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
var db *litestream.DB
var r litestream.Replica
updatedAt := time.Now()
if isURL(fs.Arg(0)) {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
return err
}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
// Load configuration.
config, err := ReadConfigFile(*configPath)
if err != nil {
return err
}
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = NewDBFromConfig(dbc); err != nil {
return err
}
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
// Determine last time database or WAL was updated.
if updatedAt, err = db.UpdatedAt(); err != nil {
return err
}
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB from from configuration.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Determine last time database or WAL was updated.
updatedAt, err := db.UpdatedAt()
if err != nil {
return err
var replicas []litestream.Replica
if r != nil {
replicas = []litestream.Replica{r}
} else {
replicas = db.Replicas
}
// List each generation.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range db.Replicas {
if *replicaName != "" && r.Name() != *replicaName {
continue
}
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range replicas {
generations, err := r.Generations(ctx)
if err != nil {
log.Printf("%s: cannot list generations: %s", r.Name(), err)
@@ -84,46 +102,61 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
r.Name(),
generation,
truncateDuration(stats.UpdatedAt.Sub(updatedAt)).String(),
truncateDuration(updatedAt.Sub(stats.UpdatedAt)).String(),
stats.CreatedAt.Format(time.RFC3339),
stats.UpdatedAt.Format(time.RFC3339),
)
}
}
w.Flush()
return nil
}
// Usage prints the help message to STDOUT.
func (c *GenerationsCommand) Usage() {
fmt.Printf(`
The generations command lists all generations for a database. It also lists
stats about their lag behind the primary database and the time range they cover.
The generations command lists all generations for a database or replica. It also
lists stats about their lag behind the primary database and the time range they
cover.
Usage:
litestream generations [arguments] DB
litestream generations [arguments] DB_PATH
litestream generations [arguments] REPLICA_URL
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
-replica NAME
Optional, filters by replica.
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}
func truncateDuration(d time.Duration) time.Duration {
if d > time.Hour {
return d.Truncate(time.Hour)
} else if d > time.Minute {
return d.Truncate(time.Minute)
} else if d > time.Second {
if d < 0 {
if d < -10*time.Second {
return d.Truncate(time.Second)
} else if d < -time.Second {
return d.Truncate(time.Second / 10)
} else if d < -time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d < -time.Microsecond {
return d.Truncate(time.Microsecond)
}
return d
}
if d > 10*time.Second {
return d.Truncate(time.Second)
} else if d > time.Second {
return d.Truncate(time.Second / 10)
} else if d > time.Millisecond {
return d.Truncate(time.Millisecond)
} else if d > time.Microsecond {

View File

@@ -2,16 +2,24 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"os/signal"
"os/user"
"path"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
_ "github.com/mattn/go-sqlite3"
"gopkg.in/yaml.v2"
)
@@ -20,28 +28,39 @@ var (
Version = "(development build)"
)
// DefaultConfigPath is the default configuration path.
const DefaultConfigPath = "/etc/litestream.yml"
// errStop is a terminal error for indicating program should quit.
var errStop = errors.New("stop")
func main() {
log.SetFlags(0)
m := NewMain()
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp {
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
os.Exit(1)
} else if err != nil {
fmt.Fprintln(os.Stderr, err)
log.Println(err)
os.Exit(1)
}
}
// Main represents the main program execution.
type Main struct{}
// NewMain returns a new instance of Main.
func NewMain() *Main {
return &Main{}
}
// Run executes the program.
func (m *Main) Run(ctx context.Context, args []string) (err error) {
// Execute replication command if running as a Windows service.
if isService, err := isWindowsService(); err != nil {
return err
} else if isService {
return runWindowsService(ctx)
}
// Extract command name.
var cmd string
if len(args) > 0 {
cmd, args = args[0], args[1:]
@@ -53,13 +72,32 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
case "generations":
return (&GenerationsCommand{}).Run(ctx, args)
case "replicate":
return (&ReplicateCommand{}).Run(ctx, args)
c := NewReplicateCommand()
if err := c.ParseFlags(ctx, args); err != nil {
return err
}
// Setup signal handler.
ctx, cancel := context.WithCancel(ctx)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
go func() { <-ch; cancel() }()
if err := c.Run(ctx); err != nil {
return err
}
// Wait for signal to stop program.
<-ctx.Done()
signal.Reset()
// Gracefully close.
return c.Close()
case "restore":
return (&RestoreCommand{}).Run(ctx, args)
case "snapshots":
return (&SnapshotsCommand{}).Run(ctx, args)
case "validate":
return (&ValidateCommand{}).Run(ctx, args)
case "version":
return (&VersionCommand{}).Run(ctx, args)
case "wal":
@@ -73,6 +111,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
}
}
// Usage prints the help screen to STDOUT.
func (m *Main) Usage() {
fmt.Println(`
litestream is a tool for replicating SQLite databases.
@@ -83,37 +122,49 @@ Usage:
The commands are:
databases list databases specified in config file
generations list available generations for a database
replicate runs a server to replicate databases
restore recovers database backup from a replica
snapshots list available snapshots for a database
validate checks replica to ensure a consistent state with primary
version prints the version
version prints the binary version
wal list available WAL files for a database
`[1:])
}
// Default configuration settings.
const (
DefaultAddr = ":9090"
)
// Config represents a configuration file for the litestream daemon.
type Config struct {
// Bind address for serving metrics.
Addr string `yaml:"addr"`
// List of databases to manage.
DBs []*DBConfig `yaml:"databases"`
DBs []*DBConfig `yaml:"dbs"`
// Global S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
}
// propagateGlobalSettings copies global S3 settings to replica configs.
func (c *Config) propagateGlobalSettings() {
for _, dbc := range c.DBs {
for _, rc := range dbc.Replicas {
if rc.AccessKeyID == "" {
rc.AccessKeyID = c.AccessKeyID
}
if rc.SecretAccessKey == "" {
rc.SecretAccessKey = c.SecretAccessKey
}
}
}
}
// DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config {
return Config{
Addr: DefaultAddr,
}
return Config{}
}
// DBConfig returns database configuration by path.
func (c *Config) DBConfig(path string) *DBConfig {
for _, dbConfig := range c.DBs {
if dbConfig.Path == path {
@@ -124,18 +175,13 @@ func (c *Config) DBConfig(path string) *DBConfig {
}
// ReadConfigFile unmarshals config from filename. Expands path if needed.
func ReadConfigFile(filename string) (Config, error) {
func ReadConfigFile(filename string) (_ Config, err error) {
config := DefaultConfig()
// Expand filename, if necessary.
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) {
u, err := user.Current()
if err != nil {
return config, err
} else if u.HomeDir == "" {
return config, fmt.Errorf("home directory unset")
}
filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix))
filename, err = expand(filename)
if err != nil {
return config, err
}
// Read & deserialize configuration.
@@ -146,32 +192,58 @@ func ReadConfigFile(filename string) (Config, error) {
} else if err := yaml.Unmarshal(buf, &config); err != nil {
return config, err
}
// Normalize paths.
for _, dbConfig := range config.DBs {
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
return config, err
}
}
// Propage settings from global config to replica configs.
config.propagateGlobalSettings()
return config, nil
}
// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
Path string `yaml:"path"`
MonitorInterval *time.Duration `yaml:"monitor-interval"`
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
Replicas []*ReplicaConfig `yaml:"replicas"`
}
type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"` // used for file replicas
}
// NewDBFromConfig instantiates a DB based on a configuration.
func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
path, err := expand(dbc.Path)
if err != nil {
return nil, err
}
func registerConfigFlag(fs *flag.FlagSet, p *string) {
fs.StringVar(p, "config", DefaultConfigPath, "config path")
}
// newDBFromConfig instantiates a DB based on a configuration.
func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
// Initialize database with given path.
db := litestream.NewDB(config.Path)
db := litestream.NewDB(path)
// Override default database settings if specified in configuration.
if dbc.MonitorInterval != nil {
db.MonitorInterval = *dbc.MonitorInterval
}
if dbc.CheckpointInterval != nil {
db.CheckpointInterval = *dbc.CheckpointInterval
}
if dbc.MinCheckpointPageN != nil {
db.MinCheckpointPageN = *dbc.MinCheckpointPageN
}
if dbc.MaxCheckpointPageN != nil {
db.MaxCheckpointPageN = *dbc.MaxCheckpointPageN
}
// Instantiate and attach replicas.
for _, rconfig := range config.Replicas {
r, err := newReplicaFromConfig(db, rconfig)
for _, rc := range dbc.Replicas {
r, err := NewReplicaFromConfig(rc, db)
if err != nil {
return nil, err
}
@@ -181,20 +253,231 @@ func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
return db, nil
}
// newReplicaFromConfig instantiates a replica for a DB based on a config.
func newReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (litestream.Replica, error) {
switch config.Type {
case "", "file":
return newFileReplicaFromConfig(db, config)
// ReplicaConfig represents the configuration for a single replica in a database.
type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"`
URL string `yaml:"url"`
Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
SnapshotInterval time.Duration `yaml:"snapshot-interval"`
ValidationInterval time.Duration `yaml:"validation-interval"`
// S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
ForcePathStyle *bool `yaml:"force-path-style"`
}
// NewReplicaFromConfig instantiates a replica for a DB based on a config.
func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (litestream.Replica, error) {
// Ensure user did not specify URL in path.
if isURL(c.Path) {
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", c.Path)
}
switch c.ReplicaType() {
case "file":
return newFileReplicaFromConfig(c, db)
case "s3":
return newS3ReplicaFromConfig(c, db)
default:
return nil, fmt.Errorf("unknown replica type in config: %q", config.Type)
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
}
}
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplica, error) {
if config.Path == "" {
return nil, fmt.Errorf("file replica path require for db %q", db.Path())
func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.FileReplica, err error) {
// Ensure URL & path are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for file replica")
}
return litestream.NewFileReplica(db, config.Name, config.Path), nil
// Parse path from URL, if specified.
path := c.Path
if c.URL != "" {
if _, _, path, err = ParseReplicaURL(c.URL); err != nil {
return nil, err
}
}
// Ensure path is set explicitly or derived from URL field.
if path == "" {
return nil, fmt.Errorf("file replica path required")
}
// Expand home prefix and return absolute path.
if path, err = expand(path); err != nil {
return nil, err
}
// Instantiate replica and apply time fields, if set.
r := litestream.NewFileReplica(db, c.Name, path)
if v := c.Retention; v > 0 {
r.Retention = v
}
if v := c.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v
}
if v := c.SnapshotInterval; v > 0 {
r.SnapshotInterval = v
}
if v := c.ValidationInterval; v > 0 {
r.ValidationInterval = v
}
return r, nil
}
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for s3 replica")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for s3 replica")
}
bucket, path := c.Bucket, c.Path
region, endpoint := c.Region, c.Endpoint
// Use path style if an endpoint is explicitly set. This works because the
// only service to not use path style is AWS which does not use an endpoint.
forcePathStyle := (endpoint != "")
if v := c.ForcePathStyle; v != nil {
forcePathStyle = *v
}
// Apply settings from URL, if specified.
if c.URL != "" {
_, host, upath, err := ParseReplicaURL(c.URL)
if err != nil {
return nil, err
}
ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host)
// Only apply URL parts to field that have not been overridden.
if path == "" {
path = upath
}
if bucket == "" {
bucket = ubucket
}
if region == "" {
region = uregion
}
if endpoint == "" {
endpoint = uendpoint
}
if !forcePathStyle {
forcePathStyle = uforcePathStyle
}
}
// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for s3 replica")
}
// Build replica.
r := s3.NewReplica(db, c.Name)
r.AccessKeyID = c.AccessKeyID
r.SecretAccessKey = c.SecretAccessKey
r.Bucket = bucket
r.Path = path
r.Region = region
r.Endpoint = endpoint
r.ForcePathStyle = forcePathStyle
if v := c.Retention; v > 0 {
r.Retention = v
}
if v := c.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v
}
if v := c.SyncInterval; v > 0 {
r.SyncInterval = v
}
if v := c.SnapshotInterval; v > 0 {
r.SnapshotInterval = v
}
if v := c.ValidationInterval; v > 0 {
r.ValidationInterval = v
}
return r, nil
}
// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
}
switch u.Scheme {
case "file":
scheme, u.Scheme = u.Scheme, ""
return scheme, "", path.Clean(u.String()), nil
case "":
return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
default:
return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
}
}
// isURL returns true if s can be parsed and has a scheme.
func isURL(s string) bool {
return regexp.MustCompile(`^\w+:\/\/`).MatchString(s)
}
// ReplicaType returns the type based on the type field or extracted from the URL.
func (c *ReplicaConfig) ReplicaType() string {
scheme, _, _, _ := ParseReplicaURL(c.URL)
if scheme != "" {
return scheme
} else if c.Type != "" {
return c.Type
}
return "file"
}
// DefaultConfigPath returns the default config path.
func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
return v
}
return defaultConfigPath
}
func registerConfigFlag(fs *flag.FlagSet) *string {
return fs.String("config", "", "config path")
}
// expand returns an absolute path for s.
func expand(s string) (string, error) {
// Just expand to absolute path if there is no home directory prefix.
prefix := "~" + string(os.PathSeparator)
if s != "~" && !strings.HasPrefix(s, prefix) {
return filepath.Abs(s)
}
// Look up home directory.
u, err := user.Current()
if err != nil {
return "", err
} else if u.HomeDir == "" {
return "", fmt.Errorf("cannot expand path %s, no home directory available", s)
}
// Return path with tilde replaced by the home directory.
if s == "~" {
return u.HomeDir, nil
}
return filepath.Join(u.HomeDir, strings.TrimPrefix(s, prefix)), nil
}

View File

@@ -0,0 +1,17 @@
// +build !windows
package main
import (
"context"
)
const defaultConfigPath = "/etc/litestream.yml"
func isWindowsService() (bool, error) {
return false, nil
}
func runWindowsService(ctx context.Context) error {
panic("cannot run windows service as unix process")
}

131
cmd/litestream/main_test.go Normal file
View File

@@ -0,0 +1,131 @@
package main_test
import (
"io/ioutil"
"path/filepath"
"testing"
"github.com/benbjohnson/litestream"
main "github.com/benbjohnson/litestream/cmd/litestream"
"github.com/benbjohnson/litestream/s3"
)
func TestReadConfigFile(t *testing.T) {
// Ensure global AWS settings are propagated down to replica configurations.
t.Run("PropagateGlobalSettings", func(t *testing.T) {
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(`
access-key-id: XXX
secret-access-key: YYY
dbs:
- path: /path/to/db
replicas:
- url: s3://foo/bar
`[1:]), 0666); err != nil {
t.Fatal(err)
}
config, err := main.ReadConfigFile(filename)
if err != nil {
t.Fatal(err)
} else if got, want := config.AccessKeyID, `XXX`; got != want {
t.Fatalf("AccessKeyID=%v, want %v", got, want)
} else if got, want := config.SecretAccessKey, `YYY`; got != want {
t.Fatalf("SecretAccessKey=%v, want %v", got, want)
} else if got, want := config.DBs[0].Replicas[0].AccessKeyID, `XXX`; got != want {
t.Fatalf("Replica.AccessKeyID=%v, want %v", got, want)
} else if got, want := config.DBs[0].Replicas[0].SecretAccessKey, `YYY`; got != want {
t.Fatalf("Replica.SecretAccessKey=%v, want %v", got, want)
}
})
}
func TestNewFileReplicaFromConfig(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{Path: "/foo"}, nil)
if err != nil {
t.Fatal(err)
} else if r, ok := r.(*litestream.FileReplica); !ok {
t.Fatal("unexpected replica type")
} else if got, want := r.Path(), "/foo"; got != want {
t.Fatalf("Path=%s, want %s", got, want)
}
}
func TestNewS3ReplicaFromConfig(t *testing.T) {
t.Run("URL", func(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if r, ok := r.(*s3.Replica); !ok {
t.Fatal("unexpected replica type")
} else if got, want := r.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
} else if got, want := r.Path, "bar"; got != want {
t.Fatalf("Path=%s, want %s", got, want)
} else if got, want := r.Region, ""; got != want {
t.Fatalf("Region=%s, want %s", got, want)
} else if got, want := r.Endpoint, ""; got != want {
t.Fatalf("Endpoint=%s, want %s", got, want)
} else if got, want := r.ForcePathStyle, false; got != want {
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
}
})
t.Run("MinIO", func(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.localhost:9000/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if r, ok := r.(*s3.Replica); !ok {
t.Fatal("unexpected replica type")
} else if got, want := r.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
} else if got, want := r.Path, "bar"; got != want {
t.Fatalf("Path=%s, want %s", got, want)
} else if got, want := r.Region, "us-east-1"; got != want {
t.Fatalf("Region=%s, want %s", got, want)
} else if got, want := r.Endpoint, "http://localhost:9000"; got != want {
t.Fatalf("Endpoint=%s, want %s", got, want)
} else if got, want := r.ForcePathStyle, true; got != want {
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
}
})
t.Run("Backblaze", func(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.s3.us-west-000.backblazeb2.com/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if r, ok := r.(*s3.Replica); !ok {
t.Fatal("unexpected replica type")
} else if got, want := r.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
} else if got, want := r.Path, "bar"; got != want {
t.Fatalf("Path=%s, want %s", got, want)
} else if got, want := r.Region, "us-west-000"; got != want {
t.Fatalf("Region=%s, want %s", got, want)
} else if got, want := r.Endpoint, "https://s3.us-west-000.backblazeb2.com"; got != want {
t.Fatalf("Endpoint=%s, want %s", got, want)
} else if got, want := r.ForcePathStyle, true; got != want {
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
}
})
t.Run("GCS", func(t *testing.T) {
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.storage.googleapis.com/bar"}, nil)
if err != nil {
t.Fatal(err)
} else if r, ok := r.(*s3.Replica); !ok {
t.Fatal("unexpected replica type")
} else if got, want := r.Bucket, "foo"; got != want {
t.Fatalf("Bucket=%s, want %s", got, want)
} else if got, want := r.Path, "bar"; got != want {
t.Fatalf("Path=%s, want %s", got, want)
} else if got, want := r.Region, "us-east-1"; got != want {
t.Fatalf("Region=%s, want %s", got, want)
} else if got, want := r.Endpoint, "https://storage.googleapis.com"; got != want {
t.Fatalf("Endpoint=%s, want %s", got, want)
} else if got, want := r.ForcePathStyle, true; got != want {
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
}
})
}

View File

@@ -0,0 +1,105 @@
// +build windows
package main
import (
"context"
"io"
"log"
"os"
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/eventlog"
)
const defaultConfigPath = `C:\Litestream\litestream.yml`
// serviceName is the Windows Service name.
const serviceName = "Litestream"
// isWindowsService returns true if currently executing within a Windows service.
func isWindowsService() (bool, error) {
return svc.IsWindowsService()
}
func runWindowsService(ctx context.Context) error {
// Attempt to install new log service. This will fail if already installed.
// We don't log the error because we don't have anywhere to log until we open the log.
_ = eventlog.InstallAsEventCreate(serviceName, eventlog.Error|eventlog.Warning|eventlog.Info)
elog, err := eventlog.Open(serviceName)
if err != nil {
return err
}
defer elog.Close()
// Set eventlog as log writer while running.
log.SetOutput((*eventlogWriter)(elog))
defer log.SetOutput(os.Stderr)
log.Print("Litestream service starting")
if err := svc.Run(serviceName, &windowsService{ctx: ctx}); err != nil {
return errStop
}
log.Print("Litestream service stopped")
return nil
}
// windowsService is an interface adapter for svc.Handler.
type windowsService struct {
ctx context.Context
}
func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, statusCh chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) {
var err error
// Notify Windows that the service is starting up.
statusCh <- svc.Status{State: svc.StartPending}
// Instantiate replication command and load configuration.
c := NewReplicateCommand()
if c.Config, err = ReadConfigFile(DefaultConfigPath()); err != nil {
log.Printf("cannot load configuration: %s", err)
return true, 1
}
// Execute replication command.
if err := c.Run(s.ctx); err != nil {
log.Printf("cannot replicate: %s", err)
statusCh <- svc.Status{State: svc.StopPending}
return true, 2
}
// Notify Windows that the service is now running.
statusCh <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop}
for {
select {
case req := <-r:
switch req.Cmd {
case svc.Stop:
c.Close()
statusCh <- svc.Status{State: svc.StopPending}
return false, windows.NO_ERROR
case svc.Interrogate:
statusCh <- req.CurrentStatus
default:
log.Printf("Litestream service received unexpected change request cmd: %d", req.Cmd)
}
}
}
}
// Ensure implementation implements io.Writer interface.
var _ io.Writer = (*eventlogWriter)(nil)
// eventlogWriter is an adapter for using eventlog.Log as an io.Writer.
type eventlogWriter eventlog.Log
func (w *eventlogWriter) Write(p []byte) (n int, err error) {
elog := (*eventlog.Log)(w)
return 0, elog.Info(1, string(p))
}

View File

@@ -2,59 +2,91 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// ReplicateCommand represents a command that continuously replicates SQLite databases.
type ReplicateCommand struct {
ConfigPath string
Config Config
Config Config
// List of managed databases specified in the config.
DBs []*litestream.DB
}
// Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
func NewReplicateCommand() *ReplicateCommand {
return &ReplicateCommand{}
}
// ParseFlags parses the CLI flags and loads the configuration file.
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
registerConfigFlag(fs, &c.ConfigPath)
tracePath := fs.String("trace", "", "trace path")
configPath := registerConfigFlag(fs)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
}
// Load configuration.
if c.ConfigPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(c.ConfigPath)
if err != nil {
return err
// Load configuration or use CLI args to build db/replica.
if fs.NArg() == 1 {
return fmt.Errorf("must specify at least one replica URL for %s", fs.Arg(0))
} else if fs.NArg() > 1 {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
dbConfig := &DBConfig{Path: fs.Arg(0)}
for _, u := range fs.Args()[1:] {
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{
URL: u,
SyncInterval: 1 * time.Second,
})
}
c.Config.DBs = []*DBConfig{dbConfig}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
if c.Config, err = ReadConfigFile(*configPath); err != nil {
return err
}
}
// Setup signal handler.
ctx, cancel := context.WithCancel(ctx)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
go func() { <-ch; cancel() }()
// Enable trace logging.
if *tracePath != "" {
f, err := os.Create(*tracePath)
if err != nil {
return err
}
defer f.Close()
litestream.Tracef = log.New(f, "", log.LstdFlags|log.LUTC|log.Lshortfile).Printf
}
return nil
}
// Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
// Display version information.
fmt.Printf("litestream %s\n", Version)
log.Printf("litestream %s", Version)
if len(config.DBs) == 0 {
return errors.New("configuration must specify at least one database")
if len(c.Config.DBs) == 0 {
log.Println("no databases specified in configuration")
}
for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(dbConfig)
for _, dbConfig := range c.Config.DBs {
db, err := NewDBFromConfig(dbConfig)
if err != nil {
return err
}
@@ -67,26 +99,36 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
}
// Notify user that initialization is done.
fmt.Printf("Initialized with %d databases.\n", len(c.DBs))
// Serve metrics over HTTP if enabled.
if config.Addr != "" {
_, port, _ := net.SplitHostPort(config.Addr)
fmt.Printf("Serving metrics on http://localhost:%s/metrics\n", port)
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(config.Addr, nil)
}()
for _, db := range c.DBs {
log.Printf("initialized db: %s", db.Path())
for _, r := range db.Replicas {
switch r := r.(type) {
case *litestream.FileReplica:
log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), r.Type(), r.Path())
case *s3.Replica:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), r.Type(), r.Bucket, r.Path, r.Region, r.Endpoint, r.SyncInterval)
default:
log.Printf("replicating to: name=%q type=%q", r.Name(), r.Type())
}
}
}
// Wait for signal to stop program.
<-ctx.Done()
signal.Reset()
// Serve metrics over HTTP if enabled.
if c.Config.Addr != "" {
hostport := c.Config.Addr
if host, port, _ := net.SplitHostPort(c.Config.Addr); port == "" {
return fmt.Errorf("must specify port for bind address: %q", c.Config.Addr)
} else if host == "" {
hostport = net.JoinHostPort("localhost", port)
}
// Gracefully close
if err := c.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
log.Printf("serving metrics on http://%s/metrics", hostport)
go func() {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(c.Config.Addr, nil); err != nil {
log.Printf("cannot start metrics server: %s", err)
}
}()
}
return nil
@@ -96,28 +138,38 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
func (c *ReplicateCommand) Close() (err error) {
for _, db := range c.DBs {
if e := db.SoftClose(); e != nil {
fmt.Printf("error closing db: path=%s err=%s\n", db.Path(), e)
log.Printf("error closing db: path=%s err=%s", db.Path(), e)
if err == nil {
err = e
}
}
}
// TODO(windows): Clear DBs
return err
}
// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
The replicate command starts a server to monitor & replicate databases
specified in your configuration file.
The replicate command starts a server to monitor & replicate databases.
You can specify your database & replicas in a configuration file or you can
replicate a single database file by specifying its path and its replicas in the
command line arguments.
Usage:
litestream replicate [arguments]
litestream replicate [arguments] DB_PATH REPLICA_URL [REPLICA_URL...]
Arguments:
-config PATH
Specifies the configuration file. Defaults to %s
Specifies the configuration file.
Defaults to %s
`[1:], DefaultConfigPath)
-trace PATH
Write verbose trace logging to PATH.
`[1:], DefaultConfigPath())
}

View File

@@ -7,20 +7,21 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/benbjohnson/litestream"
)
type RestoreCommand struct {
}
// RestoreCommand represents a command to restore a database from a backup.
type RestoreCommand struct{}
// Run executes the command.
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
opt := litestream.NewRestoreOptions()
opt.Verbose = true
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
configPath := registerConfigFlag(fs)
fs.StringVar(&opt.OutputPath, "o", "", "output path")
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.StringVar(&opt.Generation, "generation", "", "generation name")
@@ -32,20 +33,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
return fmt.Errorf("database path or replica URL required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Parse timestamp, if specified.
if *timestampStr != "" {
if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
@@ -63,32 +55,88 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
// Determine replica & generation to restore from.
var r litestream.Replica
if isURL(fs.Arg(0)) {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = c.loadFromURL(ctx, fs.Arg(0), &opt); err != nil {
return err
}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
if r, err = c.loadFromConfig(ctx, fs.Arg(0), *configPath, &opt); err != nil {
return err
}
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
// Return an error if no matching targets found.
if opt.Generation == "" {
return fmt.Errorf("no matching backups found")
}
return db.Restore(ctx, opt)
return litestream.RestoreReplica(ctx, r, opt)
}
// loadFromURL creates a replica & updates the restore options from a replica URL.
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
r, err := NewReplicaFromConfig(&ReplicaConfig{URL: replicaURL}, nil)
if err != nil {
return nil, err
}
opt.Generation, _, err = litestream.CalcReplicaRestoreTarget(ctx, r, *opt)
return r, err
}
// loadFromConfig returns a replica & updates the restore options from a DB reference.
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
// Load configuration.
config, err := ReadConfigFile(configPath)
if err != nil {
return nil, err
}
// Lookup database from configuration file by path.
if dbPath, err = expand(dbPath); err != nil {
return nil, err
}
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return nil, fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := NewDBFromConfig(dbConfig)
if err != nil {
return nil, err
}
// Restore into original database path if not specified.
if opt.OutputPath == "" {
opt.OutputPath = dbPath
}
// Determine the appropriate replica & generation to restore from,
r, generation, err := db.CalcRestoreTarget(ctx, *opt)
if err != nil {
return nil, err
}
opt.Generation = generation
return r, nil
}
// Usage prints the help screen to STDOUT.
func (c *RestoreCommand) Usage() {
fmt.Printf(`
The restore command recovers a database from a previous snapshot and WAL.
Usage:
litestream restore [arguments] DB
litestream restore [arguments] DB_PATH
litestream restore [arguments] REPLICA_URL
Arguments:
@@ -142,6 +190,6 @@ Examples:
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -2,23 +2,22 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// SnapshotsCommand represents a command to list snapshots for a command.
type SnapshotsCommand struct{}
// Run executes the command.
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
configPath := registerConfigFlag(fs)
replicaName := fs.String("replica", "", "replica name")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
@@ -29,37 +28,47 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
return err
}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Load configuration.
config, err := ReadConfigFile(*configPath)
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = NewDBFromConfig(dbc); err != nil {
return err
}
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
}
// Find snapshots by db or replica.
var infos []*litestream.SnapshotInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.Snapshots(ctx); err != nil {
if r != nil {
if infos, err = r.Snapshots(ctx); err != nil {
return err
}
} else {
@@ -69,7 +78,9 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
}
// List all snapshots.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated")
for _, info := range infos {
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n",
@@ -80,18 +91,20 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
info.CreatedAt.Format(time.RFC3339),
)
}
w.Flush()
return nil
}
// Usage prints the help screen to STDOUT.
func (c *SnapshotsCommand) Usage() {
fmt.Printf(`
The snapshots command lists all snapshots available for a database.
The snapshots command lists all snapshots available for a database or replica.
Usage:
litestream snapshots [arguments] DB
litestream snapshots [arguments] DB_PATH
litestream snapshots [arguments] REPLICA_URL
Arguments:
@@ -102,7 +115,6 @@ Arguments:
-replica NAME
Optional, filter by a specific replica.
Examples:
# List all snapshots for a database.
@@ -111,7 +123,10 @@ Examples:
# List all snapshots on S3.
$ litestream snapshots -replica s3 /path/to/db
# List all snapshots by replica URL.
$ litestream snapshots s3://mybkt/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

View File

@@ -1,136 +0,0 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"github.com/benbjohnson/litestream"
)
type ValidateCommand struct{}
func (c *ValidateCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
opt := litestream.NewRestoreOptions()
fs := flag.NewFlagSet("litestream-validate", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run")
verbose := fs.Bool("v", false, "verbose output")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Verbose output is automatically enabled if dry run is specified.
if opt.DryRun {
*verbose = true
}
// Instantiate logger if verbose output is enabled.
if *verbose {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Ensure replica exists, if specified.
if opt.ReplicaName != "" && db.Replica(opt.ReplicaName) == nil {
return fmt.Errorf("replica not found: %s", opt.ReplicaName)
}
// Validate all matching replicas.
var hasInvalidReplica bool
for _, r := range db.Replicas {
if opt.ReplicaName != "" && opt.ReplicaName != r.Name() {
continue
}
if err := db.Validate(ctx, r.Name(), opt); err != nil {
fmt.Printf("%s: replica invalid: %s\n", r.Name(), err)
}
}
if hasInvalidReplica {
return fmt.Errorf("one or more invalid replicas found")
}
fmt.Println("ok")
return nil
}
func (c *ValidateCommand) Usage() {
fmt.Printf(`
The validate command compares a checksum of the primary database with a
checksum of the replica at the same point in time. Returns an error if the
databases are not equal.
The restored database must be written to a temporary file so you must ensure
you have enough disk space before performing this operation.
Usage:
litestream validate [arguments] DB
Arguments:
-config PATH
Specifies the configuration file.
Defaults to %s
-replica NAME
Validate a specific replica.
Defaults to validating all replicas.
-dry-run
Prints all log output as if it were running but does
not perform actual validation.
-v
Verbose output.
Examples:
# Validate all replicas for the given database.
$ litestream validate /path/to/db
# Validate only the S3 replica.
$ litestream restore -replica s3 /path/to/db
`[1:],
DefaultConfigPath,
)
}

View File

@@ -6,8 +6,10 @@ import (
"fmt"
)
// VersionCommand represents a command to print the current version.
type VersionCommand struct{}
// Run executes the command.
func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-version", flag.ContinueOnError)
fs.Usage = c.Usage
@@ -20,6 +22,7 @@ func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
return nil
}
// Usage prints the help screen to STDOUT.
func (c *VersionCommand) Usage() {
fmt.Println(`
Prints the version.

View File

@@ -2,23 +2,22 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
// WALCommand represents a command to list WAL files for a database.
type WALCommand struct{}
// Run executes the command.
func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
configPath := registerConfigFlag(fs)
replicaName := fs.String("replica", "", "replica name")
generation := fs.String("generation", "", "generation name")
fs.Usage = c.Usage
@@ -30,37 +29,47 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
var db *litestream.DB
var r litestream.Replica
if isURL(fs.Arg(0)) {
if *configPath != "" {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
return err
}
} else {
if *configPath == "" {
*configPath = DefaultConfigPath()
}
// Load configuration.
config, err := ReadConfigFile(*configPath)
if err != nil {
return err
}
// Lookup database from configuration file by path.
if path, err := expand(fs.Arg(0)); err != nil {
return err
} else if dbc := config.DBConfig(path); dbc == nil {
return fmt.Errorf("database not found in config: %s", path)
} else if db, err = NewDBFromConfig(dbc); err != nil {
return err
}
// Filter by replica, if specified.
if *replicaName != "" {
if r = db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
}
}
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
// Find WAL files by db or replica.
var infos []*litestream.WALInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.WALs(ctx); err != nil {
if r != nil {
if infos, err = r.WALs(ctx); err != nil {
return err
}
} else {
@@ -70,7 +79,9 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
}
// List all WAL files.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "replica\tgeneration\tindex\toffset\tsize\tcreated")
for _, info := range infos {
if *generation != "" && info.Generation != *generation {
@@ -86,18 +97,20 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
info.CreatedAt.Format(time.RFC3339),
)
}
w.Flush()
return nil
}
// Usage prints the help screen to STDOUT.
func (c *WALCommand) Usage() {
fmt.Printf(`
The wal command lists all wal files available for a database.
Usage:
litestream wal [arguments] DB
litestream wal [arguments] DB_PATH
litestream wal [arguments] REPLICA_URL
Arguments:
@@ -111,16 +124,18 @@ Arguments:
-generation NAME
Optional, filter by a specific generation.
Examples:
# List all WAL files for a database.
$ litestream wal /path/to/db
# List all WAL files on S3 for a specific generation.
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db
$ litestream wal -replica s3 -generation xxxxxxxx /path/to/db
# List all WAL files for replica URL.
$ litestream wal s3://mybkt/db
`[1:],
DefaultConfigPath,
DefaultConfigPath(),
)
}

807
db.go

File diff suppressed because it is too large Load Diff

647
db_test.go Normal file
View File

@@ -0,0 +1,647 @@
package litestream_test
import (
"database/sql"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/benbjohnson/litestream"
)
func TestDB_Path(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.Path(), `/tmp/db`; got != want {
t.Fatalf("Path()=%v, want %v", got, want)
}
}
func TestDB_WALPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.WALPath(), `/tmp/db-wal`; got != want {
t.Fatalf("WALPath()=%v, want %v", got, want)
}
}
func TestDB_MetaPath(t *testing.T) {
t.Run("Absolute", func(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.MetaPath(), `/tmp/.db-litestream`; got != want {
t.Fatalf("MetaPath()=%v, want %v", got, want)
}
})
t.Run("Relative", func(t *testing.T) {
db := litestream.NewDB("db")
if got, want := db.MetaPath(), `.db-litestream`; got != want {
t.Fatalf("MetaPath()=%v, want %v", got, want)
}
})
}
func TestDB_GenerationNamePath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.GenerationNamePath(), `/tmp/.db-litestream/generation`; got != want {
t.Fatalf("GenerationNamePath()=%v, want %v", got, want)
}
}
func TestDB_GenerationPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.GenerationPath("xxxx"), `/tmp/.db-litestream/generations/xxxx`; got != want {
t.Fatalf("GenerationPath()=%v, want %v", got, want)
}
}
func TestDB_ShadowWALDir(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.ShadowWALDir("xxxx"), `/tmp/.db-litestream/generations/xxxx/wal`; got != want {
t.Fatalf("ShadowWALDir()=%v, want %v", got, want)
}
}
func TestDB_ShadowWALPath(t *testing.T) {
db := litestream.NewDB("/tmp/db")
if got, want := db.ShadowWALPath("xxxx", 1000), `/tmp/.db-litestream/generations/xxxx/wal/000003e8.wal`; got != want {
t.Fatalf("ShadowWALPath()=%v, want %v", got, want)
}
}
// Ensure we can check the last modified time of the real database and its WAL.
func TestDB_UpdatedAt(t *testing.T) {
t.Run("ErrNotExist", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if _, err := db.UpdatedAt(); !os.IsNotExist(err) {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("DB", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if t0, err := db.UpdatedAt(); err != nil {
t.Fatal(err)
} else if time.Since(t0) > 10*time.Second {
t.Fatalf("unexpected updated at time: %s", t0)
}
})
t.Run("WAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
t0, err := db.UpdatedAt()
if err != nil {
t.Fatal(err)
}
if os.Getenv("CI") != "" {
time.Sleep(1 * time.Second)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
t.Fatal(err)
}
if t1, err := db.UpdatedAt(); err != nil {
t.Fatal(err)
} else if !t1.After(t0) {
t.Fatalf("expected newer updated at time: %s > %s", t1, t0)
}
})
}
// Ensure we can compute a checksum on the real database.
func TestDB_CRC64(t *testing.T) {
t.Run("ErrNotExist", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if _, _, err := db.CRC64(); !os.IsNotExist(err) {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("DB", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
chksum0, _, err := db.CRC64()
if err != nil {
t.Fatal(err)
}
// Issue change that is applied to the WAL. Checksum should not change.
if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
t.Fatal(err)
} else if chksum1, _, err := db.CRC64(); err != nil {
t.Fatal(err)
} else if chksum0 == chksum1 {
t.Fatal("expected different checksum event after WAL change")
}
// Checkpoint change into database. Checksum should change.
if _, err := sqldb.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
t.Fatal(err)
}
if chksum2, _, err := db.CRC64(); err != nil {
t.Fatal(err)
} else if chksum0 == chksum2 {
t.Fatal("expected different checksums after checkpoint")
}
})
}
// Ensure we can sync the real WAL to the shadow WAL.
func TestDB_Sync(t *testing.T) {
// Ensure sync is skipped if no database exists.
t.Run("NoDB", func(t *testing.T) {
db := MustOpenDB(t)
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
})
// Ensure sync can successfully run on the initial sync.
t.Run("Initial", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify page size if now available.
if db.PageSize() == 0 {
t.Fatal("expected page size after initial sync")
}
// Obtain real WAL size.
fi, err := os.Stat(db.WALPath())
if err != nil {
t.Fatal(err)
}
// Ensure position now available.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos.Generation == "" {
t.Fatal("expected generation")
} else if got, want := pos.Index, 0; got != want {
t.Fatalf("pos.Index=%v, want %v", got, want)
} else if got, want := pos.Offset, fi.Size(); got != want {
t.Fatalf("pos.Offset=%v, want %v", got, want)
}
})
// Ensure DB can keep in sync across multiple Sync() invocations.
t.Run("MultiSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Perform initial sync & grab initial position.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Insert into table.
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
// Sync to ensure position moves forward one page.
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation != pos1.Generation {
t.Fatal("expected the same generation")
} else if got, want := pos1.Index, pos0.Index; got != want {
t.Fatalf("Index=%v, want %v", got, want)
} else if got, want := pos1.Offset, pos0.Offset+4096+litestream.WALFrameHeaderSize; got != want {
t.Fatalf("Offset=%v, want %v", got, want)
}
})
// Ensure a WAL file is created if one does not already exist.
t.Run("NoWAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Issue initial sync and truncate WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Checkpoint & fully close which should close WAL file.
if err := db.Checkpoint(litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
}
// Verify WAL does not exist.
if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
t.Fatal(err)
}
// Reopen the managed database.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
// Re-sync and ensure new generation has been created.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation after truncation")
}
})
// Ensure DB can start new generation if it detects it cannot verify last position.
t.Run("OverwritePrevPosition", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Issue initial sync and truncate WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Fully close which should close WAL file.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
}
// Verify WAL does not exist.
if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
t.Fatal(err)
}
// Insert into table multiple times to move past old offset
sqldb = MustOpenSQLDB(t, db.Path())
defer MustCloseSQLDB(t, sqldb)
for i := 0; i < 100; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
}
// Reopen the managed database.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
// Re-sync and ensure new generation has been created.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Obtain initial position.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation after truncation")
}
})
// Ensure DB can handle a mismatched header-only and start new generation.
t.Run("WALHeaderMismatch", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Grab initial position & close.
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
t.Fatal(err)
}
// Read existing file, update header checksum, and write back only header
// to simulate a header with a mismatched checksum.
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := ioutil.ReadFile(shadowWALPath); err != nil {
t.Fatal(err)
} else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify a new generation was started.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
}
})
// Ensure DB can handle partial shadow WAL header write.
t.Run("PartialShadowWALHeader", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Close & truncate shadow WAL to simulate a partial header write.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), litestream.WALHeaderSize-1); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify a new generation was started.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
}
})
// Ensure DB can handle partial shadow WAL writes.
t.Run("PartialShadowWALFrame", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Obtain current shadow WAL size.
fi, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index))
if err != nil {
t.Fatal(err)
}
// Close & truncate shadow WAL to simulate a partial frame write.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), fi.Size()-1); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify same generation is kept.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos1, pos0; got != want {
t.Fatalf("Pos()=%s want %s", got, want)
}
// Ensure shadow WAL has recovered.
if fi0, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
t.Fatal(err)
} else if got, want := fi0.Size(), fi.Size(); got != want {
t.Fatalf("Size()=%v, want %v", got, want)
}
})
// Ensure DB can handle a generation directory with a missing shadow WAL.
t.Run("NoShadowWAL", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
}
// Close & delete shadow WAL to simulate dir created but not WAL.
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := os.Remove(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
t.Fatal(err)
}
// Reopen managed database & ensure sync will still work.
db = MustOpenDBAt(t, db.Path())
defer MustCloseDB(t, db)
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Verify new generation created but index/offset the same.
if pos1, err := db.Pos(); err != nil {
t.Fatal(err)
} else if pos0.Generation == pos1.Generation {
t.Fatal("expected new generation")
} else if got, want := pos1.Index, pos0.Index; got != want {
t.Fatalf("Index=%v want %v", got, want)
} else if got, want := pos1.Offset, pos0.Offset; got != want {
t.Fatalf("Offset=%v want %v", got, want)
}
})
// Ensure DB checkpoints after minimum number of pages.
t.Run("MinCheckpointPageN", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Write at least minimum number of pages to trigger rollover.
for i := 0; i < db.MinCheckpointPageN; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
}
}
// Sync to shadow WAL.
if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
// Ensure DB checkpoints after interval.
t.Run("CheckpointInterval", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL and then sync.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Reduce checkpoint interval to ensure a rollover is triggered.
db.CheckpointInterval = 1 * time.Nanosecond
// Write to WAL & sync.
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
t.Fatal(err)
} else if err := db.Sync(); err != nil {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
}
// MustOpenDBs returns a new instance of a DB & associated SQL DB.
func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) {
db := MustOpenDB(tb)
return db, MustOpenSQLDB(tb, db.Path())
}
// MustCloseDBs closes db & sqldb and removes the parent directory.
func MustCloseDBs(tb testing.TB, db *litestream.DB, sqldb *sql.DB) {
MustCloseDB(tb, db)
MustCloseSQLDB(tb, sqldb)
}
// MustOpenDB returns a new instance of a DB.
func MustOpenDB(tb testing.TB) *litestream.DB {
dir := tb.TempDir()
return MustOpenDBAt(tb, filepath.Join(dir, "db"))
}
// MustOpenDBAt returns a new instance of a DB for a given path.
func MustOpenDBAt(tb testing.TB, path string) *litestream.DB {
tb.Helper()
db := litestream.NewDB(path)
db.MonitorInterval = 0 // disable background goroutine
if err := db.Open(); err != nil {
tb.Fatal(err)
}
return db
}
// MustCloseDB closes db and removes its parent directory.
func MustCloseDB(tb testing.TB, db *litestream.DB) {
tb.Helper()
if err := db.Close(); err != nil {
tb.Fatal(err)
} else if err := os.RemoveAll(filepath.Dir(db.Path())); err != nil {
tb.Fatal(err)
}
}
// MustOpenSQLDB returns a database/sql DB.
func MustOpenSQLDB(tb testing.TB, path string) *sql.DB {
tb.Helper()
d, err := sql.Open("sqlite3", path)
if err != nil {
tb.Fatal(err)
} else if _, err := d.Exec(`PRAGMA journal_mode = wal;`); err != nil {
tb.Fatal(err)
}
return d
}
// MustCloseSQLDB closes a database/sql DB.
func MustCloseSQLDB(tb testing.TB, d *sql.DB) {
tb.Helper()
if err := d.Close(); err != nil {
tb.Fatal(err)
}
}

View File

@@ -1,88 +0,0 @@
DESIGN
======
Litestream is a sidecar process that replicates the write ahead log (WAL) for
a SQLite database. To ensure that it can replicate every page, litestream takes
control over the checkpointing process by issuing a long running read
transaction against the database to prevent checkpointing. It then releases
this transaction once it obtains a write lock and issues the checkpoint itself.
The daemon polls the database on an interval to breifly obtain a write
transaction lock and copy over new WAL pages. Once the WAL has reached a
threshold size, litestream will issue a checkpoint and a single page write
to a table called `_litestream` to start the new WAL.
## Workflow
When litestream first loads a database, it checks if there is an existing
sidecar directory which is named `.<DB>-litestream`. If not, it initializes
the directory and starts a new generation.
A generation is a snapshot of the database followed by a continuous stream of
WAL files. A new generation is started on initialization & whenever litestream
cannot verify that it has a continuous record of WAL files. This could happen
if litestream is stopped and another process checkpoints the WAL. In this case,
a new generation ID is randomly created and a snapshot is replicated to the
appropriate destinations.
Generations also prevent two servers from replicating to the same destination
and corrupting each other's data. In this case, each server would replicate
to a different generation directory. On recovery, there will be duplicate
databases and the end user can choose which generation to recover but each
database will be uncorrupted.
## File Layout
Litestream maintains a shadow WAL which is a historical record of all previous
WAL files. These files can be deleted after a time or size threshold but should
be replicated before being deleted.
### Local
Given a database file named `db`, SQLite will create a WAL file called `db-wal`.
Litestream will then create a hidden directory called `.db-litestream` that
contains the historical record of all WAL files for the current generation.
```
db # SQLite database
db-wal # SQLite WAL
.db-litestream/
generation # current generation number
generations/
xxxxxxxx/
wal/ # WAL files
000000000000001.wal
000000000000002.wal
000000000000003.wal # active WAL
```
### Remote (S3)
```
bkt/
db/ # database path
generations/
xxxxxxxx/
snapshots/ # snapshots w/ timestamp+offset
20000101T000000Z-000000000000023.snapshot
wal/ # compressed WAL files
000000000000001-0.wal.gz
000000000000001-<offset>.wal.gz
000000000000002-0.wal.gz
00000002/
snapshot/
000000000000000.snapshot
scheduled/
daily/
20000101T000000Z-000000000000023.snapshot
20000102T000000Z-000000000000036.snapshot
monthly/
20000101T000000Z-000000000000023.snapshot
wal/
000000000000001.wal.gz
```

17
etc/build.ps1 Normal file
View File

@@ -0,0 +1,17 @@
[CmdletBinding()]
Param (
[Parameter(Mandatory = $true)]
[String] $Version
)
$ErrorActionPreference = "Stop"
# Update working directory.
Push-Location $PSScriptRoot
Trap {
Pop-Location
}
Invoke-Expression "candle.exe -nologo -arch x64 -ext WixUtilExtension -out litestream.wixobj -dVersion=`"$Version`" litestream.wxs"
Invoke-Expression "light.exe -nologo -spdb -ext WixUtilExtension -out `"litestream-${Version}.msi`" litestream.wixobj"
Pop-Location

15
etc/gon.hcl Normal file
View File

@@ -0,0 +1,15 @@
source = ["./dist/litestream"]
bundle_id = "com.middlemost.litestream"
apple_id {
username = "benbjohnson@yahoo.com"
password = "@env:AC_PASSWORD"
}
sign {
application_identity = "Developer ID Application: Middlemost Systems, LLC"
}
zip {
output_path = "dist/litestream.zip"
}

9
etc/litestream.service Normal file
View File

@@ -0,0 +1,9 @@
[Unit]
Description=Litestream
[Service]
Restart=always
ExecStart=/usr/bin/litestream replicate
[Install]
WantedBy=multi-user.target

89
etc/litestream.wxs Normal file
View File

@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="utf-8"?>
<Wix
xmlns="http://schemas.microsoft.com/wix/2006/wi"
xmlns:util="http://schemas.microsoft.com/wix/UtilExtension"
>
<?if $(sys.BUILDARCH)=x64 ?>
<?define PlatformProgramFiles = "ProgramFiles64Folder" ?>
<?else ?>
<?define PlatformProgramFiles = "ProgramFilesFolder" ?>
<?endif ?>
<Product
Id="*"
UpgradeCode="5371367e-58b3-4e52-be0d-46945eb71ce6"
Name="Litestream"
Version="$(var.Version)"
Manufacturer="Litestream"
Language="1033"
Codepage="1252"
>
<Package
Id="*"
Manufacturer="Litestream"
InstallScope="perMachine"
InstallerVersion="500"
Description="Litestream $(var.Version) installer"
Compressed="yes"
/>
<Media Id="1" Cabinet="litestream.cab" EmbedCab="yes"/>
<MajorUpgrade
Schedule="afterInstallInitialize"
DowngradeErrorMessage="A later version of [ProductName] is already installed. Setup will now exit."
/>
<Directory Id="TARGETDIR" Name="SourceDir">
<Directory Id="$(var.PlatformProgramFiles)">
<Directory Id="APPLICATIONROOTDIRECTORY" Name="Litestream"/>
</Directory>
</Directory>
<ComponentGroup Id="Files">
<Component Directory="APPLICATIONROOTDIRECTORY">
<File
Id="litestream.exe"
Name="litestream.exe"
Source="litestream.exe"
KeyPath="yes"
/>
<ServiceInstall
Id="InstallService"
Name="Litestream"
DisplayName="Litestream"
Description="Replicates SQLite databases"
ErrorControl="normal"
Start="auto"
Type="ownProcess"
>
<util:ServiceConfig
FirstFailureActionType="restart"
SecondFailureActionType="restart"
ThirdFailureActionType="restart"
RestartServiceDelayInSeconds="60"
/>
<ServiceDependency Id="wmiApSrv" />
</ServiceInstall>
<ServiceControl
Id="ServiceStateControl"
Name="Litestream"
Remove="uninstall"
Start="install"
Stop="both"
/>
<util:EventSource
Log="Application"
Name="Litestream"
EventMessageFile="%SystemRoot%\System32\EventCreate.exe"
/>
</Component>
</ComponentGroup>
<Feature Id="DefaultFeature" Level="1">
<ComponentGroupRef Id="Files" />
</Feature>
</Product>
</Wix>

10
etc/litestream.yml Normal file
View File

@@ -0,0 +1,10 @@
# AWS credentials
# access-key-id: AKIAxxxxxxxxxxxxxxxx
# secret-access-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
# dbs:
# - path: /path/to/primary/db # Database to replicate from
# replicas:
# - path: /path/to/replica # File-based replication
# - url: s3://my.bucket.com/db # S3-based replication

19
etc/nfpm.yml Normal file
View File

@@ -0,0 +1,19 @@
name: litestream
arch: amd64
platform: linux
version: "${LITESTREAM_VERSION}"
section: "default"
priority: "extra"
maintainer: "Ben Johnson <benbjohnson@yahoo.com>"
description: Litestream is a tool for real-time replication of SQLite databases.
homepage: "https://github.com/benbjohnson/litestream"
license: "GPLv3"
contents:
- src: ./litestream
dst: /usr/bin/litestream
- src: ./litestream.yml
dst: /etc/litestream.yml
type: config
- src: ./litestream.service
dst: /usr/lib/systemd/system/litestream.service
type: config

4
go.mod
View File

@@ -3,7 +3,11 @@ module github.com/benbjohnson/litestream
go 1.15
require (
github.com/aws/aws-sdk-go v1.27.0
github.com/davecgh/go-spew v1.1.1
github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3
github.com/prometheus/client_golang v1.9.0
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e
gopkg.in/yaml.v2 v2.4.0
)

7
go.sum
View File

@@ -18,6 +18,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0 h1:0xphMHGMLBrPMfxR2AmVjZKcMEESEgWF8Kru94BNByk=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -40,6 +41,7 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@@ -124,6 +126,7 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -194,7 +197,11 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4/v4 v4.1.3 h1:/dvQpkb0o1pVlSgKNQqfkavlnXaIK+hJ0LXsKRUN9D4=
github.com/pierrec/lz4/v4 v4.1.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

32
internal/internal.go Normal file
View File

@@ -0,0 +1,32 @@
package internal
import (
"io"
)
// ReadCloser wraps a reader to also attach a separate closer.
type ReadCloser struct {
r io.Reader
c io.Closer
}
// NewReadCloser returns a new instance of ReadCloser.
func NewReadCloser(r io.Reader, c io.Closer) *ReadCloser {
return &ReadCloser{r, c}
}
// Read reads bytes into the underlying reader.
func (r *ReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
// Close closes the reader (if implementing io.ReadCloser) and the Closer.
func (r *ReadCloser) Close() error {
if rc, ok := r.r.(io.Closer); ok {
if err := rc.Close(); err != nil {
r.c.Close()
return err
}
}
return r.c.Close()
}

44
internal/metrics.go Normal file
View File

@@ -0,0 +1,44 @@
package internal
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// Shared replica metrics.
var (
ReplicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "snapshot_total",
Help: "The current number of snapshots",
}, []string{"db", "name"})
ReplicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_bytes",
Help: "The number wal bytes written",
}, []string{"db", "name"})
ReplicaWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_index",
Help: "The current WAL index",
}, []string{"db", "name"})
ReplicaWALOffsetGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "wal_offset",
Help: "The current WAL offset",
}, []string{"db", "name"})
ReplicaValidationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "validation_total",
Help: "The number of validations performed",
}, []string{"db", "name", "status"})
)

View File

@@ -1,10 +1,8 @@
package litestream
import (
"compress/gzip"
"database/sql"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
@@ -13,11 +11,11 @@ import (
"regexp"
"strconv"
"strings"
"syscall"
"time"
_ "github.com/mattn/go-sqlite3"
)
// Naming constants.
const (
MetaDirSuffix = "-litestream"
@@ -38,6 +36,7 @@ const (
// Litestream errors.
var (
ErrNoGeneration = errors.New("no generation available")
ErrNoSnapshots = errors.New("no snapshots available")
ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
)
@@ -52,6 +51,30 @@ type SnapshotInfo struct {
CreatedAt time.Time
}
// FilterSnapshotsAfter returns all snapshots that were created on or after t.
func FilterSnapshotsAfter(a []*SnapshotInfo, t time.Time) []*SnapshotInfo {
other := make([]*SnapshotInfo, 0, len(a))
for _, snapshot := range a {
if !snapshot.CreatedAt.Before(t) {
other = append(other, snapshot)
}
}
return other
}
// FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
func FindMinSnapshotByGeneration(a []*SnapshotInfo, generation string) *SnapshotInfo {
var min *SnapshotInfo
for _, snapshot := range a {
if snapshot.Generation != generation {
continue
} else if min == nil || snapshot.Index < min.Index {
min = snapshot
}
}
return min
}
// WALInfo represents file information about a WAL file.
type WALInfo struct {
Name string
@@ -73,9 +96,9 @@ type Pos struct {
// String returns a string representation.
func (p Pos) String() string {
if p.IsZero() {
return "<>"
return ""
}
return fmt.Sprintf("<%s,%d,%d>", p.Generation, p.Index, p.Offset)
return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset)
}
// IsZero returns true if p is the zero value.
@@ -129,12 +152,9 @@ func readWALHeader(filename string) ([]byte, error) {
return buf[:n], err
}
func readCheckpointSeqNo(hdr []byte) uint32 {
return binary.BigEndian.Uint32(hdr[12:])
}
// readFileAt reads a slice from a file.
func readFileAt(filename string, offset, n int64) ([]byte, error) {
// readWALFileAt reads a slice from a file. Do not use this with database files
// as it causes problems with non-OFD locks.
func readWALFileAt(filename string, offset, n int64) ([]byte, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
@@ -184,19 +204,19 @@ func IsSnapshotPath(s string) bool {
// ParseSnapshotPath returns the index for the snapshot.
// Returns an error if the path is not a valid snapshot path.
func ParseSnapshotPath(s string) (index int, typ, ext string, err error) {
func ParseSnapshotPath(s string) (index int, ext string, err error) {
s = filepath.Base(s)
a := snapshotPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s)
return 0, "", fmt.Errorf("invalid snapshot path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), a[2], a[3], nil
return int(i64), a[2], nil
}
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:-(\w+))?(.snapshot(?:.gz)?)$`)
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.lz4)?)$`)
// IsWALPath returns true if s is a path to a WAL file.
func IsWALPath(s string) bool {
@@ -221,75 +241,82 @@ func ParseWALPath(s string) (index int, offset int64, ext string, err error) {
// FormatWALPath formats a WAL filename with a given index.
func FormatWALPath(index int) string {
assert(index >= 0, "wal index must be non-negative")
return fmt.Sprintf("%016x%s", index, WALExt)
return fmt.Sprintf("%08x%s", index, WALExt)
}
// FormatWALPathWithOffset formats a WAL filename with a given index & offset.
func FormatWALPathWithOffset(index int, offset int64) string {
assert(index >= 0, "wal index must be non-negative")
assert(offset >= 0, "wal offset must be non-negative")
return fmt.Sprintf("%016x_%016x%s", index, offset, WALExt)
return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt)
}
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`)
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.lz4)?)$`)
// isHexChar returns true if ch is a lowercase hex character.
func isHexChar(ch rune) bool {
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
}
// gzipReadCloser wraps gzip.Reader to also close the underlying reader on close.
type gzipReadCloser struct {
r *gzip.Reader
closer io.ReadCloser
// createFile creates the file and attempts to set the UID/GID.
func createFile(filename string, perm os.FileMode, uid, gid int) (*os.File, error) {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return nil, err
}
_ = f.Chown(uid, gid)
return f, nil
}
func (r *gzipReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
// mkdirAll is a copy of os.MkdirAll() except that it attempts to set the
// uid/gid for each created directory.
func mkdirAll(path string, perm os.FileMode, uid, gid int) error {
// Fast path: if we can tell whether path is a directory or file, stop with success or error.
dir, err := os.Stat(path)
if err == nil {
if dir.IsDir() {
return nil
}
return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
}
func (r *gzipReadCloser) Close() error {
if err := r.r.Close(); err != nil {
r.closer.Close()
// Slow path: make sure parent exists and then call Mkdir for path.
i := len(path)
for i > 0 && os.IsPathSeparator(path[i-1]) { // Skip trailing path separator.
i--
}
j := i
for j > 0 && !os.IsPathSeparator(path[j-1]) { // Scan backward over element.
j--
}
if j > 1 {
// Create parent.
err = mkdirAll(fixRootDirectory(path[:j-1]), perm, uid, gid)
if err != nil {
return err
}
}
// Parent now exists; invoke Mkdir and use its result.
err = os.Mkdir(path, perm)
if err != nil {
// Handle arguments like "foo/." by
// double-checking that directory doesn't exist.
dir, err1 := os.Lstat(path)
if err1 == nil && dir.IsDir() {
_ = os.Chown(path, uid, gid)
return nil
}
return err
}
return r.closer.Close()
_ = os.Chown(path, uid, gid)
return nil
}
// HexDump returns hexdump output but with duplicate lines removed.
func HexDump(b []byte) string {
const prefixN = len("00000000")
var output []string
var prev string
var ellipsis bool
lines := strings.Split(strings.TrimSpace(hex.Dump(b)), "\n")
for i, line := range lines {
// Add line to output if it is not repeating or the last line.
if i == 0 || i == len(lines)-1 || trimPrefixN(line, prefixN) != trimPrefixN(prev, prefixN) {
output = append(output, line)
prev, ellipsis = line, false
continue
}
// Add an ellipsis for the first duplicate line.
if !ellipsis {
output = append(output, "...")
ellipsis = true
continue
}
}
return strings.Join(output, "\n")
}
func trimPrefixN(s string, n int) string {
if len(s) < n {
return ""
}
return s[n:]
}
// Tracef is used for low-level tracing.
var Tracef = func(format string, a ...interface{}) {}
func assert(condition bool, message string) {
if !condition {

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/benbjohnson/litestream"
_ "github.com/mattn/go-sqlite3"
)
func TestChecksum(t *testing.T) {

18
litestream_unix.go Normal file
View File

@@ -0,0 +1,18 @@
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris
package litestream
import (
"os"
"syscall"
)
// fileinfo returns syscall fields from a FileInfo object.
func fileinfo(fi os.FileInfo) (uid, gid int) {
stat := fi.Sys().(*syscall.Stat_t)
return int(stat.Uid), int(stat.Gid)
}
func fixRootDirectory(p string) string {
return p
}

22
litestream_windows.go Normal file
View File

@@ -0,0 +1,22 @@
// +build windows
package litestream
import (
"os"
)
// fileinfo returns syscall fields from a FileInfo object.
func fileinfo(fi os.FileInfo) (uid, gid int) {
return -1, -1
}
// fixRootDirectory is copied from the standard library for use with mkdirAll()
func fixRootDirectory(p string) string {
if len(p) == len(`\\?\c:`) {
if os.IsPathSeparator(p[0]) && os.IsPathSeparator(p[1]) && p[2] == '?' && os.IsPathSeparator(p[3]) && p[5] == ':' {
return p + `\`
}
}
return p
}

File diff suppressed because it is too large Load Diff

90
replica_test.go Normal file
View File

@@ -0,0 +1,90 @@
package litestream_test
import (
"context"
"testing"
"github.com/benbjohnson/litestream"
)
func TestFileReplica_Sync(t *testing.T) {
// Ensure replica can successfully sync after DB has sync'd.
t.Run("InitialSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
// Sync database & then sync replica.
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if err := r.Sync(context.Background()); err != nil {
t.Fatal(err)
}
// Ensure posistions match.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := r.LastPos(), pos; got != want {
t.Fatalf("LastPos()=%v, want %v", got, want)
}
})
// Ensure replica can successfully sync multiple times.
t.Run("MultiSync", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Write to the database multiple times and sync after each write.
for i, n := 0, db.MinCheckpointPageN*2; i < n; i++ {
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz')`); err != nil {
t.Fatal(err)
}
// Sync periodically.
if i%100 == 0 || i == n-1 {
if err := db.Sync(); err != nil {
t.Fatal(err)
} else if err := r.Sync(context.Background()); err != nil {
t.Fatal(err)
}
}
}
// Ensure posistions match.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 2; got != want {
t.Fatalf("Index=%v, want %v", got, want)
} else if calcPos, err := r.CalcPos(context.Background(), pos.Generation); err != nil {
t.Fatal(err)
} else if got, want := calcPos, pos; got != want {
t.Fatalf("CalcPos()=%v, want %v", got, want)
} else if got, want := r.LastPos(), pos; got != want {
t.Fatalf("LastPos()=%v, want %v", got, want)
}
})
// Ensure replica returns an error if there is no generation available from the DB.
t.Run("ErrNoGeneration", func(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
r := NewTestFileReplica(t, db)
if err := r.Sync(context.Background()); err == nil || err.Error() != `no generation, waiting for data` {
t.Fatal(err)
}
})
}
// NewTestFileReplica returns a new replica using a temp directory & with monitoring disabled.
func NewTestFileReplica(tb testing.TB, db *litestream.DB) *litestream.FileReplica {
r := litestream.NewFileReplica(db, "", tb.TempDir())
r.MonitorEnabled = false
db.Replicas = []litestream.Replica{r}
return r
}

1195
s3/s3.go Normal file

File diff suppressed because it is too large Load Diff

80
s3/s3_test.go Normal file
View File

@@ -0,0 +1,80 @@
package s3_test
import (
"testing"
"github.com/benbjohnson/litestream/s3"
)
func TestParseHost(t *testing.T) {
// Ensure non-specific hosts return as buckets.
t.Run("S3", func(t *testing.T) {
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.litestream.io`)
if got, want := bucket, `test.litestream.io`; got != want {
t.Fatalf("bucket=%q, want %q", got, want)
} else if got, want := region, ``; got != want {
t.Fatalf("region=%q, want %q", got, want)
} else if got, want := endpoint, ``; got != want {
t.Fatalf("endpoint=%q, want %q", got, want)
} else if got, want := forcePathStyle, false; got != want {
t.Fatalf("forcePathStyle=%v, want %v", got, want)
}
})
// Ensure localhosts use an HTTP endpoint and extract the bucket name.
t.Run("Localhost", func(t *testing.T) {
t.Run("WithPort", func(t *testing.T) {
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost:9000`)
if got, want := bucket, `test`; got != want {
t.Fatalf("bucket=%q, want %q", got, want)
} else if got, want := region, `us-east-1`; got != want {
t.Fatalf("region=%q, want %q", got, want)
} else if got, want := endpoint, `http://localhost:9000`; got != want {
t.Fatalf("endpoint=%q, want %q", got, want)
} else if got, want := forcePathStyle, true; got != want {
t.Fatalf("forcePathStyle=%v, want %v", got, want)
}
})
t.Run("WithoutPort", func(t *testing.T) {
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost`)
if got, want := bucket, `test`; got != want {
t.Fatalf("bucket=%q, want %q", got, want)
} else if got, want := region, `us-east-1`; got != want {
t.Fatalf("region=%q, want %q", got, want)
} else if got, want := endpoint, `http://localhost`; got != want {
t.Fatalf("endpoint=%q, want %q", got, want)
} else if got, want := forcePathStyle, true; got != want {
t.Fatalf("forcePathStyle=%v, want %v", got, want)
}
})
})
// Ensure backblaze B2 URLs extract bucket, region, & endpoint from host.
t.Run("Backblaze", func(t *testing.T) {
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test-123.s3.us-west-000.backblazeb2.com`)
if got, want := bucket, `test-123`; got != want {
t.Fatalf("bucket=%q, want %q", got, want)
} else if got, want := region, `us-west-000`; got != want {
t.Fatalf("region=%q, want %q", got, want)
} else if got, want := endpoint, `https://s3.us-west-000.backblazeb2.com`; got != want {
t.Fatalf("endpoint=%q, want %q", got, want)
} else if got, want := forcePathStyle, true; got != want {
t.Fatalf("forcePathStyle=%v, want %v", got, want)
}
})
// Ensure GCS URLs extract bucket & endpoint from host.
t.Run("GCS", func(t *testing.T) {
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`litestream.io.storage.googleapis.com`)
if got, want := bucket, `litestream.io`; got != want {
t.Fatalf("bucket=%q, want %q", got, want)
} else if got, want := region, `us-east-1`; got != want {
t.Fatalf("region=%q, want %q", got, want)
} else if got, want := endpoint, `https://storage.googleapis.com`; got != want {
t.Fatalf("endpoint=%q, want %q", got, want)
} else if got, want := forcePathStyle, true; got != want {
t.Fatalf("forcePathStyle=%v, want %v", got, want)
}
})
}