Compare commits

...

31 Commits

Author SHA1 Message Date
Ben Johnson
80280fce54 Only update litestream_seq if size is below WAL header size 2024-03-05 15:09:07 -07:00
Jim Kalafut
94f69a0eb3 Print WAL index in hex (#567) 2024-02-14 13:00:47 -07:00
Toni Spets
e4254bbf69 Never directly dereference AWS SDK returned pointers (#557) 2024-01-07 06:18:32 +02:00
Toni Spets
e71e6856d0 Re-enable SFTP integration tests (#550) 2023-12-25 15:31:30 +02:00
Toni Spets
a47d955e3f Re-enable integration tests (#548) 2023-12-24 21:33:12 +02:00
Toni Spets
69a24afc04 Update dependencies (#546) 2023-12-24 19:59:19 +02:00
Toni Spets
adfec9a19d Enable S3 mock tests with moto (#545) 2023-12-24 19:59:05 +02:00
Toni Spets
dae4f6e481 Tolerate file deletion race on CurrentShadowWALIndex (#544) 2023-12-19 05:50:46 +02:00
Toni Spets
676810cc13 Use safe checkpointing before snapshots (#522) 2023-12-18 13:30:59 +02:00
Toni Spets
0a7f6e9345 Never expect the replica iterators to be sorted (#538) 2023-12-18 13:27:50 +02:00
Toni Spets
1af88c4052 Restore without SQLITE_FCNTL_PERSIST_WAL (#529) 2023-12-16 12:04:14 +02:00
Toni Spets
c633eb1fea Ignore WAL indices before snapshot that is being restored (#527) 2023-12-16 12:01:33 +02:00
Toni Spets
7badf0e549 Prevent deadlocks with replicas (#524) 2023-12-16 11:53:09 +02:00
Toni Spets
91ad34d709 Fix Windows builds and test they compile (#543) 2023-12-15 22:57:23 +02:00
Toni Spets
6824eb61a8 Enable staticcheck, fix issues (#542) 2023-12-15 22:07:22 +02:00
Toni Spets
1a96ad4389 Add pre-commit linting, test all PRs (#541) 2023-12-15 21:49:29 +02:00
Toni Spets
25ac72ae6c Remove verbose flag from restore doc (#540) 2023-12-15 21:20:49 +02:00
Toni Spets
85ddf32225 Always release long-running fd on db close (#519) 2023-11-14 10:31:28 -07:00
Toni Spets
ae4c9918d9 Allow giving context to database close (#520) 2023-11-14 10:28:58 -07:00
Ben Johnson
977d4a5ee4 Fix replica-without-db logger (#512) 2023-10-24 09:51:17 -05:00
Ben Johnson
c81010e7ab Fix darwin make target 2023-10-23 15:01:27 -06:00
guangwu
c1ae968188 fix: typo (#509) 2023-10-19 18:29:04 -06:00
Toni Spets
9f0e50ddf7 Add missing slog calls to commands (#508) 2023-10-19 18:28:49 -06:00
Toni Spets
fe9ab5c517 Force truncation checkpoint if WAL becomes runaway (#473) 2023-10-19 18:27:51 -06:00
Toni Spets
d02ba97453 Sync replica snapshots to previous (#480) 2023-10-19 18:27:15 -06:00
Toni Spets
b1abd6bd99 Use structured logging with slog (#475) 2023-10-16 15:05:22 -06:00
Ben Johnson
fd892eef6d Re-add arm/v7 build to Docker (#502) 2023-08-15 07:37:32 -06:00
Markus Schanz
1bfcaa4a17 Recognize Scaleway S3 replica URLs (#499) 2023-08-13 18:07:19 -06:00
Alex Garcia
a369b05ee4 Build darwin-arm64 build during make dist-macos (#500) 2023-08-09 13:00:50 -06:00
Toni Spets
e0493f979a Copy WAL frames through temp file to shadow (#474) 2023-08-08 11:40:43 -06:00
Ben Johnson
016546a3d5 Static release builds only (#497) 2023-08-08 11:33:08 -06:00
36 changed files with 832 additions and 493 deletions

View File

@@ -15,4 +15,3 @@ If you find mistakes in the documentation, please submit a fix to the
[new-issue]: https://github.com/benbjohnson/litestream/issues/new
[docs]: https://github.com/benbjohnson/litestream.io

View File

@@ -1,19 +1,63 @@
on: push
on:
push:
pull_request:
types:
- opened
- synchronize
- reopened
env:
GO_VERSION: "1.21"
name: Commit
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- run: |
go install golang.org/x/tools/cmd/goimports@latest
go install honnef.co/go/tools/cmd/staticcheck@latest
export PATH="$HOME/go/bin:$PATH"
- uses: pre-commit/action@v3.0.0
build-windows:
name: Build Windows
runs-on: ubuntu-latest
steps:
- run: sudo apt-get install -y mingw-w64
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- run: |
go build ./cmd/litestream/
file ./litestream.exe
env:
CGO_ENABLED: "1"
GOOS: windows
GOARCH: amd64
CC: x86_64-w64-mingw32-gcc
build:
name: Build & Unit Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '1.20'
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ inputs.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ inputs.os }}-go-
go-version: ${{ env.GO_VERSION }}
- run: go env
@@ -21,22 +65,13 @@ jobs:
- run: go test -v ./...
# - name: Build integration test
# run: go test -c ./integration
#
# - uses: actions/upload-artifact@v2
# with:
# name: integration.test
# path: integration.test
# if-no-files-found: error
# long-running-test:
# name: Run Long Running Unit Test
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v2
# - uses: actions/setup-go@v2
# with:
# with:
# go-version: '1.20'
# - uses: actions/cache@v2
# with:
@@ -47,75 +82,148 @@ jobs:
# - run: go install ./cmd/litestream
# - run: go test -v -run=TestCmd_Replicate_LongRunning ./integration -long-running-duration 1m
# s3-integration-test:
# name: Run S3 Integration Tests
# runs-on: ubuntu-latest
# needs: build
# steps:
# - uses: actions/download-artifact@v2
# with:
# name: integration.test
# - run: chmod +x integration.test
#
# - run: ./integration.test -test.v -test.run=TestReplicaClient -replica-type s3
# env:
# LITESTREAM_S3_ACCESS_KEY_ID: ${{ secrets.LITESTREAM_S3_ACCESS_KEY_ID }}
# LITESTREAM_S3_SECRET_ACCESS_KEY: ${{ secrets.LITESTREAM_S3_SECRET_ACCESS_KEY }}
# LITESTREAM_S3_REGION: us-east-1
# LITESTREAM_S3_BUCKET: integration.litestream.io
s3-mock-test:
name: Run S3 Mock Tests
runs-on: ubuntu-latest
needs: build
steps:
- uses: actions/checkout@v4
# gcp-integration-test:
# name: Run GCP Integration Tests
# runs-on: ubuntu-latest
# needs: build
# steps:
# - name: Extract GCP credentials
# run: 'echo "$GOOGLE_APPLICATION_CREDENTIALS" > /opt/gcp.json'
# shell: bash
# env:
# GOOGLE_APPLICATION_CREDENTIALS: ${{secrets.GOOGLE_APPLICATION_CREDENTIALS}}
#
# - uses: actions/download-artifact@v2
# with:
# name: integration.test
# - run: chmod +x integration.test
#
# - run: ./integration.test -test.v -test.run=TestReplicaClient -replica-type gcs
# env:
# GOOGLE_APPLICATION_CREDENTIALS: /opt/gcp.json
# LITESTREAM_GCS_BUCKET: integration.litestream.io
- uses: actions/setup-python@v5
with:
python-version: '3.12'
# cache: 'pip'
- run: pip install moto[s3,server]
# abs-integration-test:
# name: Run Azure Blob Store Integration Tests
# runs-on: ubuntu-latest
# needs: build
# steps:
# - uses: actions/download-artifact@v2
# with:
# name: integration.test
# - run: chmod +x integration.test
#
# - run: ./integration.test -test.v -test.run=TestReplicaClient -replica-type abs
# env:
# LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }}
# LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }}
# LITESTREAM_ABS_BUCKET: integration
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
# sftp-integration-test:
# name: Run SFTP Integration Tests
# runs-on: ubuntu-latest
# needs: build
# steps:
# - name: Extract SSH key
# run: 'echo "$LITESTREAM_SFTP_KEY" > /opt/id_ed25519'
# shell: bash
# env:
# LITESTREAM_SFTP_KEY: ${{secrets.LITESTREAM_SFTP_KEY}}
#
# - name: Run sftp tests
# run: go test -v -run=TestReplicaClient ./integration -replica-type sftp
# env:
# LITESTREAM_SFTP_HOST: ${{ secrets.LITESTREAM_SFTP_HOST }}
# LITESTREAM_SFTP_USER: ${{ secrets.LITESTREAM_SFTP_USER }}
# LITESTREAM_SFTP_KEY_PATH: /opt/id_ed25519
# LITESTREAM_SFTP_PATH: ${{ secrets.LITESTREAM_SFTP_PATH }}
- run: go env
- run: go install ./cmd/litestream
- run: ./etc/s3_mock.py go test -v ./replica_client_test.go -integration s3
s3-integration-test:
name: Run S3 Integration Tests
runs-on: ubuntu-latest
needs: build
if: github.ref == 'refs/heads/main'
concurrency:
group: integration-test-s3
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- run: go env
- run: go install ./cmd/litestream
- run: go test -v ./replica_client_test.go -integration s3
env:
LITESTREAM_S3_ACCESS_KEY_ID: ${{ secrets.LITESTREAM_S3_ACCESS_KEY_ID }}
LITESTREAM_S3_SECRET_ACCESS_KEY: ${{ secrets.LITESTREAM_S3_SECRET_ACCESS_KEY }}
LITESTREAM_S3_REGION: us-east-1
LITESTREAM_S3_BUCKET: integration.litestream.io
gcp-integration-test:
name: Run GCP Integration Tests
runs-on: ubuntu-latest
needs: build
if: github.ref == 'refs/heads/main'
concurrency:
group: integration-test-gcp
steps:
- name: Extract GCP credentials
run: 'echo "$GOOGLE_APPLICATION_CREDENTIALS" > /opt/gcp.json'
shell: bash
env:
GOOGLE_APPLICATION_CREDENTIALS: ${{secrets.GOOGLE_APPLICATION_CREDENTIALS}}
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- run: go env
- run: go install ./cmd/litestream
- run: go test -v ./replica_client_test.go -integration gcs
env:
GOOGLE_APPLICATION_CREDENTIALS: /opt/gcp.json
LITESTREAM_GCS_BUCKET: integration.litestream.io
abs-integration-test:
name: Run Azure Blob Store Integration Tests
runs-on: ubuntu-latest
needs: build
if: github.ref == 'refs/heads/main'
concurrency:
group: integration-test-abs
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- run: go env
- run: go install ./cmd/litestream
- run: go test -v ./replica_client_test.go -integration abs
env:
LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }}
LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }}
LITESTREAM_ABS_BUCKET: integration
sftp-integration-test:
name: Run SFTP Integration Tests
runs-on: ubuntu-latest
needs: build
steps:
- name: Prepare OpenSSH server
run: |-
sudo mkdir -p /test/etc/ssh /test/home /run/sshd /test/data/
sudo ssh-keygen -t ed25519 -f /test/etc/ssh/id_ed25519_host -N ""
sudo ssh-keygen -t ed25519 -f /test/etc/ssh/id_ed25519 -N ""
sudo chmod 0600 /test/etc/ssh/id_ed25519_host /test/etc/ssh/id_ed25519
sudo chmod 0644 /test/etc/ssh/id_ed25519_host.pub /test/etc/ssh/id_ed25519.pub
sudo cp /test/etc/ssh/id_ed25519 /test/id_ed25519
sudo chown $USER /test/id_ed25519
sudo tee /test/etc/ssh/sshd_config <<EOF
Port 2222
HostKey /test/etc/ssh/id_ed25519_host
AuthorizedKeysFile /test/etc/ssh/id_ed25519.pub
AuthenticationMethods publickey
Subsystem sftp internal-sftp
UsePAM no
LogLevel DEBUG
EOF
sudo /usr/sbin/sshd -e -f /test/etc/ssh/sshd_config -E /test/debug.log
- name: Test OpenSSH server works with pubkey auth
run: ssh -v -i /test/id_ed25519 -o StrictHostKeyChecking=accept-new -p 2222 root@localhost whoami || (sudo cat /test/debug.log && exit 1)
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- run: go env
- run: go install ./cmd/litestream
- run: go test -v ./replica_client_test.go -integration sftp
env:
LITESTREAM_SFTP_HOST: "localhost:2222"
LITESTREAM_SFTP_USER: "root"
LITESTREAM_SFTP_KEY_PATH: /test/id_ed25519
LITESTREAM_SFTP_PATH: /test/data

View File

@@ -2,20 +2,20 @@ on:
release:
types:
- published
pull_request:
types:
- opened
- synchronize
- reopened
branches-ignore:
- "dependabot/**"
# pull_request:
# types:
# - opened
# - synchronize
# - reopened
# branches-ignore:
# - "dependabot/**"
name: Release (Docker)
jobs:
docker:
runs-on: ubuntu-latest
env:
PLATFORMS: "linux/amd64,linux/arm64"
PLATFORMS: "linux/amd64,linux/arm64,linux/arm/v7"
VERSION: "${{ github.event_name == 'release' && github.event.release.name || github.sha }}"
steps:
@@ -27,7 +27,7 @@ jobs:
with:
username: benbjohnson
password: ${{ secrets.DOCKERHUB_TOKEN }}
- id: meta
uses: docker/metadata-action@v3
with:
@@ -39,7 +39,7 @@ jobs:
type=sha,format=long
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
- uses: docker/build-push-action@v2
with:
context: .
@@ -48,4 +48,4 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
LITESTREAM_VERSION=${{ env.VERSION }}
LITESTREAM_VERSION=${{ env.VERSION }}

View File

@@ -31,7 +31,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.20'
go-version: '1.21'
- id: release
uses: bruceadams/get-release@v1.2.2
@@ -40,7 +40,7 @@ jobs:
- name: Install cross-compilers
run: |
sudo apt-get update
sudo apt-get update
sudo apt-get install -y gcc-aarch64-linux-gnu gcc-arm-linux-gnueabihf gcc-arm-linux-gnueabi
- name: Install nfpm
@@ -54,11 +54,11 @@ jobs:
mkdir -p dist
cp etc/litestream.yml etc/litestream.service dist
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml
CGO_ENABLED=1 go build -ldflags "-s -w -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o dist/litestream ./cmd/litestream
CGO_ENABLED=1 go build -ldflags "-s -w -extldflags "-static" -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -tags osusergo,netgo,sqlite_omit_load_extension -o dist/litestream ./cmd/litestream
cd dist
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz litestream
../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb
../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb
- name: Upload release tarball
uses: actions/upload-release-asset@v1.0.2

View File

@@ -1,62 +0,0 @@
on:
release:
types:
- created
name: release (linux/static)
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- arch: amd64
cc: gcc
- arch: arm64
cc: aarch64-linux-gnu-gcc
- arch: arm
arm: 6
cc: arm-linux-gnueabi-gcc
- arch: arm
arm: 7
cc: arm-linux-gnueabihf-gcc
env:
GOOS: linux
GOARCH: ${{ matrix.arch }}
GOARM: ${{ matrix.arm }}
CC: ${{ matrix.cc }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.20'
- id: release
uses: bruceadams/get-release@v1.2.2
env:
GITHUB_TOKEN: ${{ github.token }}
- name: Install cross-compilers
run: |
sudo apt-get update
sudo apt-get install -y gcc-aarch64-linux-gnu gcc-arm-linux-gnueabihf gcc-arm-linux-gnueabi
- name: Build litestream
run: |
rm -rf dist
mkdir -p dist
CGO_ENABLED=1 go build -ldflags "-s -w -extldflags "-static" -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -tags osusergo,netgo,sqlite_omit_load_extension -o dist/litestream ./cmd/litestream
cd dist
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz litestream
- name: Upload release tarball
uses: actions/upload-release-asset@v1.0.2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.release.outputs.upload_url }}
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz
asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz
asset_content_type: application/gzip

20
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,20 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
hooks:
- id: trailing-whitespace
exclude_types: [markdown]
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/tekwizely/pre-commit-golang
rev: v1.0.0-beta.5
hooks:
- id: go-imports-repo
args:
- "-local"
- "github.com/benbjohnson/litestrem"
- "-w"
- id: go-vet-repo-mod
- id: go-staticcheck-repo-mod

View File

@@ -1,4 +1,4 @@
FROM golang:1.20.1 as builder
FROM golang:1.21.3 as builder
WORKDIR /src/litestream
COPY . .

View File

@@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.

View File

@@ -20,11 +20,17 @@ ifndef LITESTREAM_VERSION
$(error LITESTREAM_VERSION is undefined)
endif
mkdir -p dist
go build -v -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
GOOS=darwin GOARCH=amd64 CC="gcc -target amd64-apple-macos11" CGO_ENABLED=1 go build -v -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
gon etc/gon.hcl
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
GOOS=darwin GOARCH=arm64 CC="gcc -target arm64-apple-macos11" CGO_ENABLED=1 go build -v -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
gon etc/gon.hcl
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-arm64.zip
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-arm64.zip
clean:
rm -rf dist

View File

@@ -58,4 +58,3 @@ If you find mistakes in the documentation, please submit a fix to the
[new-issue]: https://github.com/benbjohnson/litestream/issues/new
[docs]: https://github.com/benbjohnson/litestream.io

View File

@@ -4,8 +4,8 @@ import (
"context"
"flag"
"fmt"
"log"
"os"
"sort"
"text/tabwriter"
"time"
@@ -87,15 +87,17 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
for _, r := range replicas {
generations, err := r.Client.Generations(ctx)
if err != nil {
log.Printf("%s: cannot list generations: %s", r.Name(), err)
r.Logger().Error("cannot list generations", "error", err)
continue
}
sort.Strings(generations)
// Iterate over each generation for the replica.
for _, generation := range generations {
createdAt, updatedAt, err := r.GenerationTimeBounds(ctx, generation)
if err != nil {
log.Printf("%s: cannot determine generation time bounds: %s", r.Name(), err)
r.Logger().Error("cannot determine generation time bounds", "error", err)
continue
}
@@ -122,7 +124,7 @@ cover.
Usage:
litestream generations [arguments] DB_PATH
litestream generations [arguments] REPLICA_URL
Arguments:

View File

@@ -5,8 +5,7 @@ import (
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"log/slog"
"net/url"
"os"
"os/user"
@@ -37,13 +36,11 @@ var (
var errStop = errors.New("stop")
func main() {
log.SetFlags(0)
m := NewMain()
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
os.Exit(1)
} else if err != nil {
log.Println(err)
slog.Error("failed to run", "error", err)
os.Exit(1)
}
}
@@ -95,17 +92,17 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
// Wait for signal to stop program.
select {
case err = <-c.execCh:
fmt.Println("subprocess exited, litestream shutting down")
slog.Info("subprocess exited, litestream shutting down")
case sig := <-signalCh:
fmt.Println("signal received, litestream shutting down")
slog.Info("signal received, litestream shutting down")
if c.cmd != nil {
fmt.Println("sending signal to exec process")
slog.Info("sending signal to exec process")
if err := c.cmd.Process.Signal(sig); err != nil {
return fmt.Errorf("cannot signal exec process: %w", err)
}
fmt.Println("waiting for exec process to close")
slog.Info("waiting for exec process to close")
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
return fmt.Errorf("cannot wait for exec process: %w", err)
}
@@ -116,7 +113,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
if e := c.Close(); e != nil && err == nil {
err = e
}
fmt.Println("litestream shut down")
slog.Info("litestream shut down")
return err
case "restore":
@@ -172,6 +169,16 @@ type Config struct {
// Global S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
// Logging
Logging LoggingConfig `yaml:"logging"`
}
// LoggingConfig configures logging.
type LoggingConfig struct {
Level string `yaml:"level"`
Type string `yaml:"type"`
Stderr bool `yaml:"stderr"`
}
// propagateGlobalSettings copies global S3 settings to replica configs.
@@ -215,7 +222,7 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
}
// Read configuration.
buf, err := ioutil.ReadFile(filename)
buf, err := os.ReadFile(filename)
if os.IsNotExist(err) {
return config, fmt.Errorf("config file not found: %s", filename)
} else if err != nil {
@@ -241,6 +248,36 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
// Propage settings from global config to replica configs.
config.propagateGlobalSettings()
// Configure logging.
logOutput := os.Stdout
if config.Logging.Stderr {
logOutput = os.Stderr
}
logOptions := slog.HandlerOptions{
Level: slog.LevelInfo,
}
switch strings.ToUpper(config.Logging.Level) {
case "DEBUG":
logOptions.Level = slog.LevelDebug
case "WARN", "WARNING":
logOptions.Level = slog.LevelWarn
case "ERROR":
logOptions.Level = slog.LevelError
}
var logHandler slog.Handler
switch config.Logging.Type {
case "json":
logHandler = slog.NewJSONHandler(logOutput, &logOptions)
case "text", "":
logHandler = slog.NewTextHandler(logOutput, &logOptions)
}
// Set global default logger.
slog.SetDefault(slog.New(logHandler))
return config, nil
}

View File

@@ -1,4 +1,4 @@
// +build !windows
//go:build !windows
package main

View File

@@ -1,7 +1,6 @@
package main_test
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
@@ -16,7 +15,7 @@ func TestReadConfigFile(t *testing.T) {
// Ensure global AWS settings are propagated down to replica configurations.
t.Run("PropagateGlobalSettings", func(t *testing.T) {
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(`
if err := os.WriteFile(filename, []byte(`
access-key-id: XXX
secret-access-key: YYY
@@ -48,7 +47,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(`
if err := os.WriteFile(filename, []byte(`
dbs:
- path: $LITESTREAM_TEST_0129380
replicas:
@@ -75,7 +74,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(`
if err := os.WriteFile(filename, []byte(`
dbs:
- path: /path/to/db
replicas:

View File

@@ -1,11 +1,11 @@
// +build windows
//go:build windows
package main
import (
"context"
"io"
"log"
"log/slog"
"os"
"os/signal"
@@ -36,16 +36,16 @@ func runWindowsService(ctx context.Context) error {
defer elog.Close()
// Set eventlog as log writer while running.
log.SetOutput((*eventlogWriter)(elog))
defer log.SetOutput(os.Stderr)
slog.SetDefault(slog.New(slog.NewTextHandler((*eventlogWriter)(elog), nil)))
defer slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, nil)))
log.Print("Litestream service starting")
slog.Info("Litestream service starting")
if err := svc.Run(serviceName, &windowsService{ctx: ctx}); err != nil {
return errStop
}
log.Print("Litestream service stopped")
slog.Info("Litestream service stopped")
return nil
}
@@ -63,13 +63,13 @@ func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, stat
// Instantiate replication command and load configuration.
c := NewReplicateCommand()
if c.Config, err = ReadConfigFile(DefaultConfigPath(), true); err != nil {
log.Printf("cannot load configuration: %s", err)
slog.Error("cannot load configuration", "error", err)
return true, 1
}
// Execute replication command.
if err := c.Run(s.ctx); err != nil {
log.Printf("cannot replicate: %s", err)
if err := c.Run(); err != nil {
slog.Error("cannot replicate", "error", err)
statusCh <- svc.Status{State: svc.StopPending}
return true, 2
}
@@ -88,7 +88,7 @@ func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, stat
case svc.Interrogate:
statusCh <- req.CurrentStatus
default:
log.Printf("Litestream service received unexpected change request cmd: %d", req.Cmd)
slog.Error("Litestream service received unexpected change request", "cmd", req.Cmd)
}
}
}

View File

@@ -4,7 +4,7 @@ import (
"context"
"flag"
"fmt"
"log"
"log/slog"
"net"
"net/http"
_ "net/http/pprof"
@@ -42,7 +42,6 @@ func NewReplicateCommand() *ReplicateCommand {
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
execFlag := fs.String("exec", "", "execute subcommand")
tracePath := fs.String("trace", "", "trace path")
configPath, noExpandEnv := registerConfigFlag(fs)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
@@ -80,27 +79,17 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
c.Config.Exec = *execFlag
}
// Enable trace logging.
if *tracePath != "" {
f, err := os.Create(*tracePath)
if err != nil {
return err
}
defer f.Close()
litestream.Tracef = log.New(f, "", log.LstdFlags|log.Lmicroseconds|log.LUTC|log.Lshortfile).Printf
}
return nil
}
// Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run() (err error) {
// Display version information.
log.Printf("litestream %s", Version)
slog.Info("litestream", "version", Version)
// Setup databases.
if len(c.Config.DBs) == 0 {
log.Println("no databases specified in configuration")
slog.Error("no databases specified in configuration")
}
for _, dbConfig := range c.Config.DBs {
@@ -118,21 +107,22 @@ func (c *ReplicateCommand) Run() (err error) {
// Notify user that initialization is done.
for _, db := range c.DBs {
log.Printf("initialized db: %s", db.Path())
slog.Info("initialized db", "path", db.Path())
for _, r := range db.Replicas {
slog := slog.With("name", r.Name(), "type", r.Client.Type(), "sync-interval", r.SyncInterval)
switch client := r.Client.(type) {
case *file.ReplicaClient:
log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), client.Type(), client.Path())
slog.Info("replicating to", "path", client.Path())
case *s3.ReplicaClient:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Region, client.Endpoint, r.SyncInterval)
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region, "endpoint", client.Endpoint)
case *gcs.ReplicaClient:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, r.SyncInterval)
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path)
case *abs.ReplicaClient:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Endpoint, r.SyncInterval)
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint)
case *sftp.ReplicaClient:
log.Printf("replicating to: name=%q type=%q host=%q user=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Host, client.User, client.Path, r.SyncInterval)
slog.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
default:
log.Printf("replicating to: name=%q type=%q", r.Name(), client.Type())
slog.Info("replicating to")
}
}
}
@@ -146,11 +136,11 @@ func (c *ReplicateCommand) Run() (err error) {
hostport = net.JoinHostPort("localhost", port)
}
log.Printf("serving metrics on http://%s/metrics", hostport)
slog.Info("serving metrics on", "url", fmt.Sprintf("http://%s/metrics", hostport))
go func() {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(c.Config.Addr, nil); err != nil {
log.Printf("cannot start metrics server: %s", err)
slog.Error("cannot start metrics server", "error", err)
}
}()
}
@@ -178,8 +168,8 @@ func (c *ReplicateCommand) Run() (err error) {
// Close closes all open databases.
func (c *ReplicateCommand) Close() (err error) {
for _, db := range c.DBs {
if e := db.Close(); e != nil {
log.Printf("error closing db: path=%s err=%s", db.Path(), e)
if e := db.Close(context.Background()); e != nil {
db.Logger.Error("error closing db", "error", e)
if err == nil {
err = e
}
@@ -191,7 +181,7 @@ func (c *ReplicateCommand) Close() (err error) {
// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
The replicate command starts a server to monitor & replicate databases.
The replicate command starts a server to monitor & replicate databases.
You can specify your database & replicas in a configuration file or you can
replicate a single database file by specifying its path and its replicas in the
command line arguments.
@@ -215,8 +205,5 @@ Arguments:
-no-expand-env
Disables environment variable expansion in configuration file.
-trace PATH
Write verbose trace logging to PATH.
`[1:], DefaultConfigPath())
}

View File

@@ -5,7 +5,7 @@ import (
"errors"
"flag"
"fmt"
"log"
"log/slog"
"os"
"strconv"
"time"
@@ -19,7 +19,6 @@ type RestoreCommand struct{}
// Run executes the command.
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
opt := litestream.NewRestoreOptions()
opt.Verbose = true
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
configPath, noExpandEnv := registerConfigFlag(fs)
@@ -31,7 +30,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
ifDBNotExists := fs.Bool("if-db-not-exists", false, "")
ifReplicaExists := fs.Bool("if-replica-exists", false, "")
timestampStr := fs.String("timestamp", "", "timestamp")
verbose := fs.Bool("v", false, "verbose output")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
@@ -48,11 +46,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
}
}
// Instantiate logger if verbose output is enabled.
if *verbose {
opt.Logger = log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds)
}
// Determine replica & generation to restore from.
var r *litestream.Replica
if isURL(fs.Arg(0)) {
@@ -60,7 +53,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
return fmt.Errorf("cannot specify a replica URL and the -config flag")
}
if r, err = c.loadFromURL(ctx, fs.Arg(0), *ifDBNotExists, &opt); err == errSkipDBExists {
fmt.Println("database already exists, skipping")
slog.Info("database already exists, skipping")
return nil
} else if err != nil {
return err
@@ -70,7 +63,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
*configPath = DefaultConfigPath()
}
if r, err = c.loadFromConfig(ctx, fs.Arg(0), *configPath, !*noExpandEnv, *ifDBNotExists, &opt); err == errSkipDBExists {
fmt.Println("database already exists, skipping")
slog.Info("database already exists, skipping")
return nil
} else if err != nil {
return err
@@ -81,7 +74,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
// If optional flag set, return success. Useful for automated recovery.
if opt.Generation == "" {
if *ifReplicaExists {
fmt.Println("no matching backups found")
slog.Info("no matching backups found")
return nil
}
return fmt.Errorf("no matching backups found")
@@ -204,9 +197,6 @@ Arguments:
Determines the number of WAL files downloaded in parallel.
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
-v
Verbose output.
Examples:

View File

@@ -4,7 +4,7 @@ import (
"context"
"flag"
"fmt"
"log"
"log/slog"
"os"
"text/tabwriter"
"time"
@@ -82,7 +82,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
for _, r := range replicas {
infos, err := r.Snapshots(ctx)
if err != nil {
log.Printf("cannot determine snapshots: %s", err)
slog.Error("cannot determine snapshots", "error", err)
continue
}
for _, info := range infos {

View File

@@ -4,7 +4,6 @@ import (
"context"
"flag"
"fmt"
"log"
"os"
"text/tabwriter"
"time"
@@ -86,7 +85,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
generations = []string{*generation}
} else {
if generations, err = r.Client.Generations(ctx); err != nil {
log.Printf("%s: cannot determine generations: %s", r.Name(), err)
r.Logger().Error("cannot determine generations", "error", err)
continue
}
}
@@ -102,7 +101,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
for itr.Next() {
info := itr.WALSegment()
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%d\t%s\n",
fmt.Fprintf(w, "%s\t%s\t%x\t%d\t%d\t%s\n",
r.Name(),
info.Generation,
info.Index,
@@ -113,7 +112,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
}
return itr.Close()
}(); err != nil {
log.Printf("%s: cannot fetch wal segments: %s", r.Name(), err)
r.Logger().Error("cannot fetch wal segments", "error", err)
continue
}
}

214
db.go
View File

@@ -10,8 +10,7 @@ import (
"fmt"
"hash/crc64"
"io"
"io/ioutil"
"log"
"log/slog"
"math"
"math/rand"
"os"
@@ -31,6 +30,7 @@ const (
DefaultCheckpointInterval = 1 * time.Minute
DefaultMinCheckpointPageN = 1000
DefaultMaxCheckpointPageN = 10000
DefaultTruncatePageN = 500000
)
// MaxIndex is the maximum possible WAL index.
@@ -85,6 +85,16 @@ type DB struct {
// unbounded if there are always read transactions occurring.
MaxCheckpointPageN int
// Threshold of WAL size, in pages, before a forced truncation checkpoint.
// A forced truncation checkpoint will block new transactions and wait for
// existing transactions to finish before issuing a checkpoint and
// truncating the WAL.
//
// If zero, no truncates are forced. This can cause the WAL to grow
// unbounded if there's a sudden spike of changes between other
// checkpoints.
TruncatePageN int
// Time between automatic checkpoints in the WAL. This is done to allow
// more fine-grained WAL files so that restores can be performed with
// better precision.
@@ -97,8 +107,8 @@ type DB struct {
// Must be set before calling Open().
Replicas []*Replica
// Where to send log messages, defaults to log.Default()
Logger *log.Logger
// Where to send log messages, defaults to global slog with databas epath.
Logger *slog.Logger
}
// NewDB returns a new instance of DB for a given path.
@@ -112,10 +122,10 @@ func NewDB(path string) *DB {
MinCheckpointPageN: DefaultMinCheckpointPageN,
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
TruncatePageN: DefaultTruncatePageN,
CheckpointInterval: DefaultCheckpointInterval,
MonitorInterval: DefaultMonitorInterval,
Logger: log.Default(),
Logger: slog.With("db", path),
}
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
@@ -196,7 +206,7 @@ func (db *DB) CurrentShadowWALPath(generation string) (string, error) {
// CurrentShadowWALIndex returns the current WAL index & total size.
func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, err error) {
fis, err := ioutil.ReadDir(filepath.Join(db.GenerationPath(generation), "wal"))
des, err := os.ReadDir(filepath.Join(db.GenerationPath(generation), "wal"))
if os.IsNotExist(err) {
return 0, 0, nil // no wal files written for generation
} else if err != nil {
@@ -204,7 +214,14 @@ func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, e
}
// Find highest wal index.
for _, fi := range fis {
for _, de := range des {
fi, err := de.Info()
if os.IsNotExist(err) {
continue // file was deleted after os.ReadDir returned
} else if err != nil {
return 0, 0, err
}
if v, err := ParseWALPath(fi.Name()); err != nil {
continue // invalid wal filename
} else if v > index {
@@ -306,14 +323,11 @@ func (db *DB) Open() (err error) {
}
// Close flushes outstanding WAL writes to replicas, releases the read lock,
// and closes the database.
func (db *DB) Close() (err error) {
// and closes the database. Takes a context for final sync.
func (db *DB) Close(ctx context.Context) (err error) {
db.cancel()
db.wg.Wait()
// Start a new context for shutdown since we canceled the DB context.
ctx := context.Background()
// Perform a final db sync, if initialized.
if db.db != nil {
if e := db.Sync(ctx); e != nil && err == nil {
@@ -344,6 +358,12 @@ func (db *DB) Close() (err error) {
}
}
if db.f != nil {
if e := db.f.Close(); e != nil && err == nil {
err = e
}
}
return err
}
@@ -461,7 +481,7 @@ func (db *DB) init() (err error) {
// If we have an existing shadow WAL, ensure the headers match.
if err := db.verifyHeadersMatch(); err != nil {
db.Logger.Printf("%s: init: cannot determine last wal position, clearing generation; %s", db.path, err)
db.Logger.Warn("init: cannot determine last wal position, clearing generation", "error", err)
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove generation name: %w", err)
}
@@ -532,7 +552,7 @@ func (db *DB) cleanGenerations() error {
}
dir := filepath.Join(db.metaPath, "generations")
fis, err := ioutil.ReadDir(dir)
fis, err := os.ReadDir(dir)
if os.IsNotExist(err) {
return nil
} else if err != nil {
@@ -579,7 +599,7 @@ func (db *DB) cleanWAL() error {
// Remove all WAL files for the generation before the lowest index.
dir := db.ShadowWALDir(generation)
fis, err := ioutil.ReadDir(dir)
fis, err := os.ReadDir(dir)
if os.IsNotExist(err) {
return nil
} else if err != nil {
@@ -609,7 +629,7 @@ func (db *DB) acquireReadLock() error {
}
// Execute read query to obtain read lock.
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
if _, err := tx.ExecContext(context.Background(), `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
_ = tx.Rollback()
return err
}
@@ -629,13 +649,17 @@ func (db *DB) releaseReadLock() error {
// Rollback & clear read transaction.
err := db.rtx.Rollback()
db.rtx = nil
if errors.Is(err, context.Canceled) {
err = nil
}
return err
}
// CurrentGeneration returns the name of the generation saved to the "generation"
// file in the meta data directory. Returns empty string if none exists.
func (db *DB) CurrentGeneration() (string, error) {
buf, err := ioutil.ReadFile(db.GenerationNamePath())
buf, err := os.ReadFile(db.GenerationNamePath())
if os.IsNotExist(err) {
return "", nil
} else if err != nil {
@@ -673,11 +697,11 @@ func (db *DB) createGeneration() (string, error) {
// Atomically write generation name as current generation.
generationNamePath := db.GenerationNamePath()
mode := os.FileMode(0600)
mode := os.FileMode(0o600)
if db.fileInfo != nil {
mode = db.fileInfo.Mode()
}
if err := ioutil.WriteFile(generationNamePath+".tmp", []byte(generation+"\n"), mode); err != nil {
if err := os.WriteFile(generationNamePath+".tmp", []byte(generation+"\n"), mode); err != nil {
return "", fmt.Errorf("write generation temp file: %w", err)
}
uid, gid := internal.Fileinfo(db.fileInfo)
@@ -703,7 +727,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
if err := db.init(); err != nil {
return err
} else if db.db == nil {
Tracef("%s: sync: no database found", db.path)
db.Logger.Debug("sync: no database found")
return nil
}
@@ -729,7 +753,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("cannot verify wal state: %w", err)
}
Tracef("%s: sync: info=%#v", db.path, info)
db.Logger.Debug("sync", "info", &info)
// Track if anything in the shadow WAL changes and then notify at the end.
changed := info.walSize != info.shadowWALSize || info.restart || info.reason != ""
@@ -740,7 +764,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
if info.generation, err = db.createGeneration(); err != nil {
return fmt.Errorf("create generation: %w", err)
}
db.Logger.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
db.Logger.Info("sync: new generation", "generation", info.generation, "reason", info.reason)
// Clear shadow wal info.
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
@@ -751,7 +775,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
}
// Synchronize real WAL with current shadow WAL.
newWALSize, err := db.syncWAL(info)
origWALSize, newWALSize, err := db.syncWAL(info)
if err != nil {
return fmt.Errorf("sync wal: %w", err)
}
@@ -760,7 +784,9 @@ func (db *DB) Sync(ctx context.Context) (err error) {
// If WAL size is greater than min threshold, attempt checkpoint.
var checkpoint bool
checkpointMode := CheckpointModePassive
if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
if db.TruncatePageN > 0 && origWALSize >= calcWALSize(db.pageSize, db.TruncatePageN) {
checkpoint, checkpointMode = true, CheckpointModeTruncate
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
checkpoint, checkpointMode = true, CheckpointModeRestart
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
checkpoint = true
@@ -794,7 +820,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
db.notify = make(chan struct{})
}
Tracef("%s: sync: ok", db.path)
db.Logger.Debug("sync: ok")
return nil
}
@@ -919,29 +945,29 @@ type syncInfo struct {
}
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
func (db *DB) syncWAL(info syncInfo) (origSize int64, newSize int64, err error) {
// Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed.
newSize, err = db.copyToShadowWAL(info.shadowWALPath)
origSize, newSize, err = db.copyToShadowWAL(info.shadowWALPath)
if err != nil {
return newSize, fmt.Errorf("cannot copy to shadow wal: %w", err)
return origSize, newSize, fmt.Errorf("cannot copy to shadow wal: %w", err)
} else if !info.restart {
return newSize, nil // If no restart required, exit.
return origSize, newSize, nil // If no restart required, exit.
}
// Parse index of current shadow WAL file.
dir, base := filepath.Split(info.shadowWALPath)
index, err := ParseWALPath(base)
if err != nil {
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
return 0, 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
}
// Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
newSize, err = db.initShadowWALFile(newShadowWALPath)
if err != nil {
return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
return 0, 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
}
return newSize, nil
return origSize, newSize, nil
}
func (db *DB) initShadowWALFile(filename string) (int64, error) {
@@ -964,93 +990,109 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
}
// Write header to new WAL shadow file.
mode := os.FileMode(0600)
mode := os.FileMode(0o600)
if fi := db.fileInfo; fi != nil {
mode = fi.Mode()
}
if err := internal.MkdirAll(filepath.Dir(filename), db.dirInfo); err != nil {
return 0, err
} else if err := ioutil.WriteFile(filename, hdr, mode); err != nil {
} else if err := os.WriteFile(filename, hdr, mode); err != nil {
return 0, err
}
uid, gid := internal.Fileinfo(db.fileInfo)
_ = os.Chown(filename, uid, gid)
// Copy as much shadow WAL as available.
newSize, err := db.copyToShadowWAL(filename)
_, newSize, err := db.copyToShadowWAL(filename)
if err != nil {
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err)
}
return newSize, nil
}
func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
Tracef("%s: copy-shadow: %s", db.path, filename)
func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64, err error) {
logger := db.Logger.With("filename", filename)
logger.Debug("copy-shadow")
r, err := os.Open(db.WALPath())
if err != nil {
return 0, err
return 0, 0, err
}
defer r.Close()
w, err := os.OpenFile(filename, os.O_RDWR, 0666)
fi, err := r.Stat()
if err != nil {
return 0, err
return 0, 0, err
}
origWalSize = frameAlign(fi.Size(), db.pageSize)
w, err := os.OpenFile(filename, os.O_RDWR, 0o666)
if err != nil {
return 0, 0, err
}
defer w.Close()
fi, err := w.Stat()
fi, err = w.Stat()
if err != nil {
return 0, err
return 0, 0, err
}
origSize := frameAlign(fi.Size(), db.pageSize)
// Read shadow WAL header to determine byte order for checksum & salt.
hdr := make([]byte, WALHeaderSize)
if _, err := io.ReadFull(w, hdr); err != nil {
return 0, fmt.Errorf("read header: %w", err)
return 0, 0, fmt.Errorf("read header: %w", err)
}
hsalt0 := binary.BigEndian.Uint32(hdr[16:])
hsalt1 := binary.BigEndian.Uint32(hdr[20:])
bo, err := headerByteOrder(hdr)
if err != nil {
return 0, err
return 0, 0, err
}
// Read previous checksum.
chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize)
if err != nil {
return 0, fmt.Errorf("last checksum: %w", err)
return 0, 0, fmt.Errorf("last checksum: %w", err)
}
// Write to a temporary shadow file.
tempFilename := filename + ".tmp"
defer os.Remove(tempFilename)
f, err := internal.CreateFile(tempFilename, db.fileInfo)
if err != nil {
return 0, 0, fmt.Errorf("create temp file: %w", err)
}
defer f.Close()
// Seek to correct position on real wal.
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("real wal seek: %w", err)
return 0, 0, fmt.Errorf("real wal seek: %w", err)
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("shadow wal seek: %w", err)
return 0, 0, fmt.Errorf("shadow wal seek: %w", err)
}
// Read through WAL from last position to find the page of the last
// committed transaction.
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
var buf bytes.Buffer
offset := origSize
lastCommitSize := origSize
for {
// Read next page from WAL file.
if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err)
logger.Debug("copy-shadow: break", "offset", offset, "error", err)
break // end of file or partial page
} else if err != nil {
return 0, fmt.Errorf("read wal: %w", err)
return 0, 0, fmt.Errorf("read wal: %w", err)
}
// Read frame salt & compare to header salt. Stop reading on mismatch.
salt0 := binary.BigEndian.Uint32(frame[8:])
salt1 := binary.BigEndian.Uint32(frame[12:])
if salt0 != hsalt0 || salt1 != hsalt1 {
Tracef("%s: copy-shadow: break: salt mismatch", db.path)
logger.Debug("copy-shadow: break: salt mismatch")
break
}
@@ -1060,38 +1102,60 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
if chksum0 != fchksum0 || chksum1 != fchksum1 {
Tracef("%s: copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", db.path, offset, chksum0, chksum1, fchksum0, fchksum1)
logger.Debug("copy shadow: checksum mismatch, skipping", "offset", offset, "check", fmt.Sprintf("(%x,%x) != (%x,%x)", chksum0, chksum1, fchksum0, fchksum1))
break
}
// Add page to the new size of the shadow WAL.
buf.Write(frame)
// Write page to temporary WAL file.
if _, err := f.Write(frame); err != nil {
return 0, 0, fmt.Errorf("write temp shadow wal: %w", err)
}
Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
logger.Debug("copy-shadow: ok", "offset", offset, "salt", fmt.Sprintf("%x %x", salt0, salt1))
offset += int64(len(frame))
// Flush to shadow WAL if commit record.
// Update new size if written frame was a commit record.
newDBSize := binary.BigEndian.Uint32(frame[4:])
if newDBSize != 0 {
if _, err := buf.WriteTo(w); err != nil {
return 0, fmt.Errorf("write shadow wal: %w", err)
}
buf.Reset()
lastCommitSize = offset
}
}
// Sync & close.
// If no WAL writes found, exit.
if origSize == lastCommitSize {
return origSize, lastCommitSize, nil
}
walByteN := lastCommitSize - origSize
// Move to beginning of temporary file.
if _, err := f.Seek(0, io.SeekStart); err != nil {
return 0, 0, fmt.Errorf("temp file seek: %w", err)
}
// Copy from temporary file to shadow WAL.
if _, err := io.Copy(w, &io.LimitedReader{R: f, N: walByteN}); err != nil {
return 0, 0, fmt.Errorf("write shadow file: %w", err)
}
// Close & remove temporary file.
if err := f.Close(); err != nil {
return 0, 0, err
} else if err := os.Remove(tempFilename); err != nil {
return 0, 0, err
}
// Sync & close shadow WAL.
if err := w.Sync(); err != nil {
return 0, err
return 0, 0, err
} else if err := w.Close(); err != nil {
return 0, err
return 0, 0, err
}
// Track total number of bytes written to WAL.
db.totalWALBytesCounter.Add(float64(lastCommitSize - origSize))
db.totalWALBytesCounter.Add(float64(walByteN))
return lastCommitSize, nil
return origWalSize, lastCommitSize, nil
}
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
@@ -1266,7 +1330,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
}
// Copy shadow WAL before checkpoint to copy as much as possible.
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
}
@@ -1274,7 +1338,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
// a new page is written.
if err := db.execCheckpoint(mode); err != nil {
return err
} else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
} else if err := db.ensureWALExists(); err != nil {
return err
}
@@ -1301,7 +1365,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
}
// Copy the end of the previous WAL before starting a new shadow WAL.
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
}
@@ -1360,11 +1424,11 @@ func (db *DB) execCheckpoint(mode string) (err error) {
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
return err
}
Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
db.Logger.Debug("checkpoint", "mode", mode, "result", fmt.Sprintf("%d,%d,%d", row[0], row[1], row[2]))
// Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err)
return fmt.Errorf("acquire read lock: %w", err)
}
return nil
@@ -1385,7 +1449,7 @@ func (db *DB) monitor() {
// Sync the database to the shadow WAL.
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
db.Logger.Printf("%s: sync error: %s", db.path, err)
db.Logger.Error("sync error", "error", err)
}
}
}
@@ -1427,7 +1491,7 @@ func applyWAL(ctx context.Context, index int, dbPath string) error {
}
// Open SQLite database and force a truncating checkpoint.
d, err := sql.Open("litestream-sqlite3", dbPath)
d, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}
@@ -1529,10 +1593,6 @@ type RestoreOptions struct {
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
// Logging settings.
Logger *log.Logger
Verbose bool
}
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.

View File

@@ -3,7 +3,6 @@ package litestream_test
import (
"context"
"database/sql"
"io/ioutil"
"os"
"path/filepath"
"strings"
@@ -264,7 +263,7 @@ func TestDB_Sync(t *testing.T) {
// Checkpoint & fully close which should close WAL file.
if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
} else if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
@@ -314,7 +313,7 @@ func TestDB_Sync(t *testing.T) {
}
// Fully close which should close WAL file.
if err := db.Close(); err != nil {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
} else if err := sqldb.Close(); err != nil {
t.Fatal(err)
@@ -367,16 +366,16 @@ func TestDB_Sync(t *testing.T) {
pos0, err := db.Pos()
if err != nil {
t.Fatal(err)
} else if err := db.Close(); err != nil {
} else if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
}
// Read existing file, update header checksum, and write back only header
// to simulate a header with a mismatched checksum.
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := ioutil.ReadFile(shadowWALPath); err != nil {
if buf, err := os.ReadFile(shadowWALPath); err != nil {
t.Fatal(err)
} else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0o600); err != nil {
t.Fatal(err)
}
@@ -413,7 +412,7 @@ func TestDB_Sync(t *testing.T) {
}
// Close & truncate shadow WAL to simulate a partial header write.
if err := db.Close(); err != nil {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), litestream.WALHeaderSize-1); err != nil {
t.Fatal(err)
@@ -458,7 +457,7 @@ func TestDB_Sync(t *testing.T) {
}
// Close & truncate shadow WAL to simulate a partial frame write.
if err := db.Close(); err != nil {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), fi.Size()-1); err != nil {
t.Fatal(err)
@@ -504,7 +503,7 @@ func TestDB_Sync(t *testing.T) {
}
// Close & delete shadow WAL to simulate dir created but not WAL.
if err := db.Close(); err != nil {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
} else if err := os.Remove(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
t.Fatal(err)
@@ -553,12 +552,7 @@ func TestDB_Sync(t *testing.T) {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
// NOTE: The minimum checkpoint may only do a PASSIVE checkpoint so we can't guarantee a rollover.
})
// Ensure DB checkpoints after interval.
@@ -582,13 +576,6 @@ func TestDB_Sync(t *testing.T) {
} else if err := db.Sync(context.Background()); err != nil {
t.Fatal(err)
}
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
}
@@ -626,7 +613,7 @@ func MustOpenDBAt(tb testing.TB, path string) *litestream.DB {
// MustCloseDB closes db and removes its parent directory.
func MustCloseDB(tb testing.TB, db *litestream.DB) {
tb.Helper()
if err := db.Close(); err != nil && !strings.Contains(err.Error(), `database is closed`) {
if err := db.Close(context.Background()); err != nil && !strings.Contains(err.Error(), `database is closed`) {
tb.Fatal(err)
} else if err := os.RemoveAll(filepath.Dir(db.Path())); err != nil {
tb.Fatal(err)

View File

@@ -26,9 +26,9 @@
Description="Litestream $(var.Version) installer"
Compressed="yes"
/>
<Media Id="1" Cabinet="litestream.cab" EmbedCab="yes"/>
<MajorUpgrade
Schedule="afterInstallInitialize"
DowngradeErrorMessage="A later version of [ProductName] is already installed. Setup will now exit."

View File

@@ -7,4 +7,3 @@
# replicas:
# - path: /path/to/replica # File-based replication
# - url: s3://my.bucket.com/db # S3-based replication

35
etc/s3_mock.py Executable file
View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python3
import sys
import os
import time
from moto.server import ThreadedMotoServer
import boto3
import subprocess
cmd = sys.argv[1:]
if len(cmd) == 0:
print(f"usage: {sys.argv[0]} <command> [arguments]", file=sys.stderr)
sys.exit(1)
env = os.environ.copy() | {
"LITESTREAM_S3_ACCESS_KEY_ID": "lite",
"LITESTREAM_S3_SECRET_ACCESS_KEY": "stream",
"LITESTREAM_S3_BUCKET": f"test{int(time.time())}",
"LITESTREAM_S3_ENDPOINT": "http://127.0.0.1:5000",
"LITESTREAM_S3_FORCE_PATH_STYLE": "true",
}
server = ThreadedMotoServer()
server.start()
s3 = boto3.client(
"s3",
aws_access_key_id=env["LITESTREAM_S3_ACCESS_KEY_ID"],
aws_secret_access_key=["LITESTREAM_S3_SECRET_ACCESS_KEY"],
endpoint_url=env["LITESTREAM_S3_ENDPOINT"]
).create_bucket(Bucket=env["LITESTREAM_S3_BUCKET"])
proc = subprocess.run(cmd, env=env)
server.stop()
sys.exit(proc.returncode)

View File

@@ -4,10 +4,8 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
@@ -112,7 +110,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
return nil, fmt.Errorf("cannot determine generations path: %w", err)
}
fis, err := ioutil.ReadDir(root)
fis, err := os.ReadDir(root)
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
@@ -181,8 +179,6 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
})
}
sort.Sort(litestream.SnapshotInfoSlice(infos))
return litestream.NewSnapshotInfoSliceIterator(infos), nil
}
@@ -298,8 +294,6 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
})
}
sort.Sort(litestream.WALSegmentInfoSlice(infos))
return litestream.NewWALSegmentInfoSliceIterator(infos), nil
}

70
go.mod
View File

@@ -1,55 +1,65 @@
module github.com/benbjohnson/litestream
go 1.19
go 1.21
require (
cloud.google.com/go/storage v1.31.0
cloud.google.com/go/storage v1.36.0
filippo.io/age v1.1.1
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aws/aws-sdk-go v1.44.318
github.com/aws/aws-sdk-go v1.49.5
github.com/mattn/go-shellwords v1.0.12
github.com/mattn/go-sqlite3 v1.14.17
github.com/pierrec/lz4/v4 v4.1.18
github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.16.0
golang.org/x/crypto v0.12.0
golang.org/x/sync v0.3.0
golang.org/x/sys v0.11.0
google.golang.org/api v0.135.0
github.com/mattn/go-sqlite3 v1.14.19
github.com/pierrec/lz4/v4 v4.1.19
github.com/pkg/sftp v1.13.6
github.com/prometheus/client_golang v1.17.0
golang.org/x/crypto v0.17.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
google.golang.org/api v0.154.0
gopkg.in/yaml.v2 v2.4.0
)
require (
cloud.google.com/go v0.110.7 // indirect
cloud.google.com/go/compute v1.23.0 // indirect
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-ieproxy v0.0.11 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230807174057-1744710a1577 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230807174057-1744710a1577 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect
google.golang.org/grpc v1.57.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

85
go.sum
View File

@@ -2,14 +2,22 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.110.7 h1:rJyC7nWRg2jWGZ4wSJ5nY65GTdYJkg0cd/uXb+ACI6o=
cloud.google.com/go v0.110.7/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI=
cloud.google.com/go v0.111.0 h1:YHLKNupSD1KqjDbQ3+LVdQ81h/UJbJyZG203cEfnQgM=
cloud.google.com/go v0.111.0/go.mod h1:0mibmpKP1TyOOFYQY5izo0LnT+ecvOQ0Sg3OdmMiNRU=
cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY=
cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk=
cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y=
cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU=
cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI=
cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8=
cloud.google.com/go/storage v1.31.0 h1:+S3LjjEN2zZ+L5hOwj4+1OkGCsLVe0NzpXKQ1pSdTCI=
cloud.google.com/go/storage v1.31.0/go.mod h1:81ams1PrhW16L4kF7qg+4mTq7SRs5HsbDTM0bWvrwJ0=
cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8=
cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8=
filippo.io/age v1.1.1 h1:pIpO7l151hCnQ4BdyBujnGP2YlUo0uj6sAVNHGBvXHg=
filippo.io/age v1.1.1/go.mod h1:l03SrzDUrBkdBx8+IILdnn2KZysqQdbEBUQ4p3sqEQE=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
@@ -31,6 +39,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/aws/aws-sdk-go v1.44.318 h1:Yl66rpbQHFUbxe9JBKLcvOvRivhVgP6+zH0b9KzARX8=
github.com/aws/aws-sdk-go v1.44.318/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.49.5 h1:y2yfBlwjPDi3/sBVKeznYEdDy6wIhjA2L5NCBMLUIYA=
github.com/aws/aws-sdk-go v1.49.5/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -53,9 +63,16 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
@@ -86,15 +103,24 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc=
github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.2.5 h1:UR4rDjcgpgEnqpIEvkiqTYKBCKLNmlge2eVjoZfySzM=
github.com/googleapis/enterprise-certificate-proxy v0.2.5/go.mod h1:RxW0N9901Cko1VOCW3SXCpWP+mlIEkk2tP7jnHy9a3w=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
@@ -106,6 +132,7 @@ github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -116,27 +143,44 @@ github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebG
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI=
github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.5 h1:a3RLUqkyjYRtBTZJZ1VRrKbN3zhuPLlUc3sphVz81go=
github.com/pkg/sftp v1.13.5/go.mod h1:wHDZ0IZX6JcBYRK1TH9bcVq8G7TLpVHYIGJRFnmPfxg=
github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo=
github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@@ -149,6 +193,16 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -157,8 +211,11 @@ golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@@ -182,10 +239,14 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ=
golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -193,6 +254,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -208,10 +271,14 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -221,6 +288,10 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@@ -233,22 +304,34 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
google.golang.org/api v0.135.0 h1:6Vgfj6uPMXcyy66waYWBwmkeNB+9GmUlJDOzkukPQYQ=
google.golang.org/api v0.135.0/go.mod h1:Bp77uRFgwsSKI0BWH573F5Q6wSlznwI2NFayLOp/7mQ=
google.golang.org/api v0.154.0 h1:X7QkVKZBskztmpPKWQXgjJRPA2dJYrL6r+sYPRLj050=
google.golang.org/api v0.154.0/go.mod h1:qhSMkM85hgqiokIYsrRyKxrjfBeIhgl4Z2JmeRkYylc=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20230807174057-1744710a1577 h1:Tyk/35yqszRCvaragTn5NnkY6IiKk/XvHzEWepo71N0=
google.golang.org/genproto v0.0.0-20230807174057-1744710a1577/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos=
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY=
google.golang.org/genproto/googleapis/api v0.0.0-20230807174057-1744710a1577 h1:xv8KoglAClYGkprUSmDTKaILtzfD8XzG9NYVXMprjKo=
google.golang.org/genproto/googleapis/api v0.0.0-20230807174057-1744710a1577/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk=
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 h1:s1w3X6gQxwrLEpxnLd/qXTVLgQE2yXwaOaoa6IlY/+o=
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0/go.mod h1:CAny0tYF+0/9rmDB9fahA9YLzX3+AEVl1qXbv5hhj6c=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 h1:wukfNtZmZUurLN/atp2hiIeTKn7QJWIQdHzqmsOnAOk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
@@ -259,6 +342,8 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -1,3 +1,4 @@
//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris
package internal

View File

@@ -1,3 +1,4 @@
//go:build windows
// +build windows
package internal

View File

@@ -540,9 +540,6 @@ func isHexChar(ch rune) bool {
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
}
// Tracef is used for low-level tracing.
var Tracef = func(format string, a ...interface{}) {}
func assert(condition bool, message string) {
if !condition {
panic("assertion failed: " + message)

View File

@@ -6,8 +6,7 @@ import (
"fmt"
"hash/crc64"
"io"
"io/ioutil"
"log"
"log/slog"
"math"
"os"
"path/filepath"
@@ -72,9 +71,6 @@ type Replica struct {
// Encryption identities and recipients
AgeIdentities []age.Identity
AgeRecipients []age.Recipient
// The logger to send logging messages to. Defaults to log.Default()
Logger *log.Logger
}
func NewReplica(db *DB, name string) *Replica {
@@ -87,7 +83,6 @@ func NewReplica(db *DB, name string) *Replica {
Retention: DefaultRetention,
RetentionCheckInterval: DefaultRetentionCheckInterval,
MonitorEnabled: true,
Logger: log.Default(),
}
return r
@@ -101,6 +96,15 @@ func (r *Replica) Name() string {
return r.name
}
// Logger returns the DB sub-logger for this replica.
func (r *Replica) Logger() *slog.Logger {
logger := slog.Default()
if r.db != nil {
logger = r.db.Logger
}
return logger.With("replica", r.Name())
}
// DB returns a reference to the database the replica is attached to, if any.
func (r *Replica) DB() *DB { return r.db }
@@ -166,13 +170,13 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
}
generation := dpos.Generation
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
r.Logger().Debug("replica sync", "position", dpos.String())
// Create a new snapshot and update the current replica position if
// the generation on the database has changed.
if r.Pos().Generation != generation {
// Create snapshot if no snapshots exist for generation.
snapshotN, err := r.snapshotN(generation)
snapshotN, err := r.snapshotN(ctx, generation)
if err != nil {
return err
} else if snapshotN == 0 {
@@ -188,7 +192,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
return fmt.Errorf("cannot determine replica position: %s", err)
}
Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos)
r.Logger().Debug("replica sync: calc new pos", "position", pos.String())
r.mu.Lock()
r.pos = pos
r.mu.Unlock()
@@ -222,11 +226,23 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
// Obtain initial position from shadow reader.
// It may have moved to the next index if previous position was at the end.
pos := rd.Pos()
initialPos := pos
startTime := time.Now()
var bytesWritten int
logger := r.Logger()
logger.Info("write wal segment", "position", initialPos.String())
// Copy through pipe into client from the starting position.
var g errgroup.Group
g.Go(func() error {
_, err := r.Client.WriteWALSegment(ctx, pos, pr)
// Always close pipe reader to signal writers.
if e := pr.CloseWithError(err); err == nil {
return e
}
return err
})
@@ -263,6 +279,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}
// Copy frames.
@@ -289,6 +306,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}
// Flush LZ4 writer, encryption writer and close pipe.
@@ -314,12 +332,13 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index))
replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset))
logger.Info("wal segment written", "position", initialPos.String(), "elapsed", time.Since(startTime).String(), "sz", bytesWritten)
return nil
}
// snapshotN returns the number of snapshots for a generation.
func (r *Replica) snapshotN(generation string) (int, error) {
itr, err := r.Client.Snapshots(context.Background(), generation)
func (r *Replica) snapshotN(ctx context.Context, generation string) (int, error) {
itr, err := r.Client.Snapshots(ctx, generation)
if err != nil {
return 0, err
}
@@ -366,7 +385,7 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err
rd = io.NopCloser(drd)
}
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
n, err := io.Copy(io.Discard, lz4.NewReader(rd))
if err != nil {
return pos, err
}
@@ -463,16 +482,16 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
r.muf.Lock()
defer r.muf.Unlock()
// Prevent checkpoints during snapshot.
r.db.BeginSnapshot()
defer r.db.EndSnapshot()
// Issue a passive checkpoint to flush any pages to disk before snapshotting.
if _, err := r.db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(PASSIVE);`); err != nil {
if err := r.db.Checkpoint(ctx, CheckpointModePassive); err != nil {
return info, fmt.Errorf("pre-snapshot checkpoint: %w", err)
}
// Acquire a read lock on the database during snapshot to prevent checkpoints.
// Prevent internal checkpoints during snapshot.
r.db.BeginSnapshot()
defer r.db.EndSnapshot()
// Acquire a read lock on the database during snapshot to prevent external checkpoints.
tx, err := r.db.db.Begin()
if err != nil {
return info, err
@@ -535,6 +554,10 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return wc.Close()
})
logger := r.Logger()
logger.Info("write snapshot", "position", pos.String())
startTime := time.Now()
// Delegate write to client & wait for writer goroutine to finish.
if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil {
return info, err
@@ -542,8 +565,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return info, err
}
r.Logger.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
logger.Info("snapshot written", "position", pos.String(), "elapsed", time.Since(startTime).String(), "sz", info.Size)
return info, nil
}
@@ -610,7 +632,7 @@ func (r *Replica) deleteSnapshotsBeforeIndex(ctx context.Context, generation str
if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil {
return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err)
}
r.Logger.Printf("%s(%s): snapshot deleted %s/%08x", r.db.Path(), r.Name(), generation, index)
r.Logger().Info("snapshot deleted", "generation", generation, "index", index)
}
return itr.Close()
@@ -642,8 +664,8 @@ func (r *Replica) deleteWALSegmentsBeforeIndex(ctx context.Context, generation s
if err := r.Client.DeleteWALSegments(ctx, a); err != nil {
return fmt.Errorf("delete wal segments: %w", err)
}
r.Logger.Printf("%s(%s): wal segmented deleted before %s/%08x: n=%d", r.db.Path(), r.Name(), generation, index, len(a))
r.Logger().Info("wal segmented deleted before", "generation", generation, "index", index, "n", len(a))
return nil
}
@@ -679,7 +701,7 @@ func (r *Replica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil {
r.Logger.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("monitor error", "error", err)
continue
}
}
@@ -707,7 +729,7 @@ func (r *Replica) retainer(ctx context.Context) {
return
case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil {
r.Logger.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("retainer error", "error", err)
continue
}
}
@@ -720,6 +742,31 @@ func (r *Replica) snapshotter(ctx context.Context) {
return
}
logger := r.Logger()
if pos, err := r.db.Pos(); err != nil {
logger.Error("snapshotter cannot determine generation", "error", err)
} else if !pos.IsZero() {
if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil {
logger.Error("snapshotter cannot determine latest snapshot", "error", err)
} else if snapshot != nil {
nextSnapshot := r.SnapshotInterval - time.Since(snapshot.CreatedAt)
if nextSnapshot < 0 {
nextSnapshot = 0
}
logger.Info("snapshot interval adjusted", "previous", snapshot.CreatedAt.Format(time.RFC3339), "next", nextSnapshot.String())
select {
case <-ctx.Done():
return
case <-time.After(nextSnapshot):
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
logger.Error("snapshotter error", "error", err)
}
}
}
}
ticker := time.NewTicker(r.SnapshotInterval)
defer ticker.Stop()
@@ -729,7 +776,7 @@ func (r *Replica) snapshotter(ctx context.Context) {
return
case <-ticker.C:
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
r.Logger.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("snapshotter error", "error", err)
continue
}
}
@@ -757,7 +804,7 @@ func (r *Replica) validator(ctx context.Context) {
return
case <-ticker.C:
if err := r.Validate(ctx); err != nil {
r.Logger.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("validation error", "error", err)
continue
}
}
@@ -770,7 +817,7 @@ func (r *Replica) Validate(ctx context.Context) error {
db := r.DB()
// Restore replica to a temporary directory.
tmpdir, err := ioutil.TempDir("", "*-litestream")
tmpdir, err := os.MkdirTemp("", "*-litestream")
if err != nil {
return err
}
@@ -794,7 +841,6 @@ func (r *Replica) Validate(ctx context.Context) error {
ReplicaName: r.Name(),
Generation: pos.Generation,
Index: pos.Index - 1,
Logger: r.Logger,
}); err != nil {
return fmt.Errorf("cannot restore: %w", err)
}
@@ -819,7 +865,7 @@ func (r *Replica) Validate(ctx context.Context) error {
if mismatch {
status = "mismatch"
}
r.Logger.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
r.Logger().Info("validator", "status", status, "db", fmt.Sprintf("%016x", chksum0), "replica", fmt.Sprintf("%016x", chksum1), "position", pos.String())
// Validate checksums match.
if mismatch {
@@ -837,8 +883,6 @@ func (r *Replica) Validate(ctx context.Context) error {
// waitForReplica blocks until replica reaches at least the given position.
func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
db := r.DB()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
@@ -861,7 +905,7 @@ func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
// Obtain current position of replica, check if past target position.
curr := r.Pos()
if curr.IsZero() {
r.Logger.Printf("%s(%s): validator: no replica position available", db.Path(), r.Name())
r.Logger().Info("validator: no replica position available")
continue
}
@@ -917,6 +961,7 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
}
defer sitr.Close()
minIndex, maxIndex := -1, -1
for sitr.Next() {
info := sitr.Snapshot()
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
@@ -925,6 +970,12 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
if updatedAt.IsZero() || info.CreatedAt.After(updatedAt) {
updatedAt = info.CreatedAt
}
if minIndex == -1 || info.Index < minIndex {
minIndex = info.Index
}
if info.Index > maxIndex {
maxIndex = info.Index
}
}
if err := sitr.Close(); err != nil {
return createdAt, updatedAt, err
@@ -939,6 +990,9 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
for witr.Next() {
info := witr.WALSegment()
if info.Index < minIndex || info.Index > maxIndex {
continue
}
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
createdAt = info.CreatedAt
}
@@ -1013,17 +1067,6 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
return fmt.Errorf("cannot specify index & timestamp to restore")
}
// Ensure logger exists.
logger := opt.Logger
if logger == nil {
logger = r.Logger
}
logPrefix := r.Name()
if db := r.DB(); db != nil {
logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name())
}
// Ensure output path does not already exist.
if _, err := os.Stat(opt.OutputPath); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
@@ -1044,7 +1087,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
}
// Compute list of offsets for each WAL index.
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, opt.Index, opt.Timestamp)
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, minWALIndex, opt.Index, opt.Timestamp)
if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err)
}
@@ -1058,7 +1101,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
}
// Ensure that we found the specific index, if one was specified.
if opt.Index != math.MaxInt32 && opt.Index != opt.Index {
if opt.Index != math.MaxInt32 && opt.Index != maxWALIndex {
return fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", opt.Index, opt.Generation, maxWALIndex)
}
@@ -1070,19 +1113,19 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
tmpPath := opt.OutputPath + ".tmp"
// Copy snapshot to output path.
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
r.Logger().Info("restoring snapshot", "generation", opt.Generation, "index", minWALIndex, "path", tmpPath)
if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err)
}
// If no WAL files available, move snapshot to final path & exit early.
if snapshotOnly {
logger.Printf("%s: snapshot only, finalizing database", logPrefix)
r.Logger().Info("snapshot only, finalizing database")
return os.Rename(tmpPath, opt.OutputPath)
}
// Begin processing WAL files.
logger.Printf("%s: restoring wal files: generation=%s index=[%08x,%08x]", logPrefix, opt.Generation, minWALIndex, maxWALIndex)
r.Logger().Info("restoring wal files", "generation", opt.Generation, "index_min", minWALIndex, "index_max", maxWALIndex)
// Fill input channel with all WAL indexes to be loaded in order.
// Verify every index has at least one offset.
@@ -1138,9 +1181,9 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
return err
}
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
time.Since(startTime).String(),
r.Logger().Info("downloaded wal",
"generation", opt.Generation, "index", index,
"elapsed", time.Since(startTime).String(),
)
}
}
@@ -1167,10 +1210,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
if err = applyWAL(ctx, index, tmpPath); err != nil {
return fmt.Errorf("cannot apply wal: %w", err)
}
logger.Printf("%s: applied wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
time.Since(startTime).String(),
)
r.Logger().Info("applied wal", "generation", opt.Generation, "index", index, "elapsed", time.Since(startTime).String())
}
// Ensure all goroutines finish. All errors should have been handled during
@@ -1180,7 +1220,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
}
// Copy file to final location.
logger.Printf("%s: renaming database from temporary location", logPrefix)
r.Logger().Info("renaming database from temporary location")
if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
return err
}
@@ -1241,7 +1281,7 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i
}
// Use snapshot if it newer.
if snapshotIndex == -1 || snapshotIndex >= snapshotIndex {
if snapshotIndex == -1 || snapshot.Index >= snapshotIndex {
snapshotIndex = snapshot.Index
}
}
@@ -1255,22 +1295,29 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i
// walSegmentMap returns a map of WAL indices to their segments.
// Filters by a max timestamp or a max index.
func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
itr, err := r.Client.WALSegments(ctx, generation)
if err != nil {
return nil, err
}
defer itr.Close()
m := make(map[int][]int64)
a := []WALSegmentInfo{}
for itr.Next() {
info := itr.WALSegment()
a = append(a, itr.WALSegment())
}
sort.Sort(WALSegmentInfoSlice(a))
m := make(map[int][]int64)
for _, info := range a {
// Exit if we go past the max timestamp or index.
if !maxTimestamp.IsZero() && info.CreatedAt.After(maxTimestamp) {
break // after max timestamp, skip
} else if info.Index > maxIndex {
break // after max index, skip
} else if info.Index < minIndex {
continue // before min index, continue
}
// Verify offsets are added in order.

View File

@@ -10,13 +10,13 @@ type ReplicaClient interface {
// Returns the type of client.
Type() string
// Returns a list of available generations.
// Returns a list of available generations. Order is undefined.
Generations(ctx context.Context) ([]string, error)
// Deletes all snapshots & WAL segments within a generation.
DeleteGeneration(ctx context.Context, generation string) error
// Returns an iterator of all snapshots within a generation on the replica.
// Returns an iterator of all snapshots within a generation on the replica. Order is undefined.
Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)
// Writes LZ4 compressed snapshot data to the replica at a given index
@@ -31,7 +31,7 @@ type ReplicaClient interface {
// the snapshot does not exist.
SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
// Returns an iterator of all WAL segments within a generation on the replica.
// Returns an iterator of all WAL segments within a generation on the replica. Order is undefined.
WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)
// Writes an LZ4 compressed WAL segment at a given position.

View File

@@ -4,7 +4,7 @@ import (
"context"
"flag"
"fmt"
"io/ioutil"
"io"
"math/rand"
"os"
"path"
@@ -12,7 +12,6 @@ import (
"sort"
"strings"
"testing"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs"
@@ -22,10 +21,6 @@ import (
"github.com/benbjohnson/litestream/sftp"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
var (
// Enables integration tests.
integration = flag.String("integration", "file", "")
@@ -80,10 +75,14 @@ func TestReplicaClient_Generations(t *testing.T) {
t.Fatal(err)
}
// Verify returned generations.
if got, err := c.Generations(context.Background()); err != nil {
// Fetch and sort generations.
got, err := c.Generations(context.Background())
if err != nil {
t.Fatal(err)
} else if want := []string{"155fe292f8333c72", "5efbd8d042012dca", "b16ddcf5c697540f"}; !reflect.DeepEqual(got, want) {
}
sort.Strings(got)
if want := []string{"155fe292f8333c72", "5efbd8d042012dca", "b16ddcf5c697540f"}; !reflect.DeepEqual(got, want) {
t.Fatalf("Generations()=%v, want %v", got, want)
}
})
@@ -193,7 +192,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) {
if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil {
t.Fatal(err)
} else if buf, err := ioutil.ReadAll(r); err != nil {
} else if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err)
} else if err := r.Close(); err != nil {
t.Fatal(err)
@@ -224,7 +223,7 @@ func TestReplicaClient_SnapshotReader(t *testing.T) {
}
defer r.Close()
if buf, err := ioutil.ReadAll(r); err != nil {
if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err)
} else if got, want := string(buf), "foo"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want)
@@ -378,7 +377,7 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) {
if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil {
t.Fatal(err)
} else if buf, err := ioutil.ReadAll(r); err != nil {
} else if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err)
} else if err := r.Close(); err != nil {
t.Fatal(err)
@@ -409,7 +408,7 @@ func TestReplicaClient_WALReader(t *testing.T) {
}
defer r.Close()
if buf, err := ioutil.ReadAll(r); err != nil {
if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err)
} else if got, want := string(buf), "foobar"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want)

View File

@@ -13,6 +13,13 @@ import (
"github.com/pierrec/lz4/v4"
)
func nextIndex(pos litestream.Pos) litestream.Pos {
return litestream.Pos{
Generation: pos.Generation,
Index: pos.Index + 1,
}
}
func TestReplica_Name(t *testing.T) {
t.Run("WithName", func(t *testing.T) {
if got, want := litestream.NewReplica(nil, "NAME").Name(), "NAME"; got != want {
@@ -32,11 +39,6 @@ func TestReplica_Sync(t *testing.T) {
db, sqldb := MustOpenDBs(t)
defer MustCloseDBs(t, db, sqldb)
// Execute a query to force a write to the WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Issue initial database sync to setup generation.
if err := db.Sync(context.Background()); err != nil {
t.Fatal(err)
@@ -48,6 +50,10 @@ func TestReplica_Sync(t *testing.T) {
t.Fatal(err)
}
if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
}
c := file.NewReplicaClient(t.TempDir())
r := litestream.NewReplica(db, "")
c.Replica, r.Client = r, c
@@ -66,10 +72,47 @@ func TestReplica_Sync(t *testing.T) {
t.Fatalf("generations[0]=%v, want %v", got, want)
}
// Verify we synced checkpoint page to WAL.
if r, err := c.WALSegmentReader(context.Background(), nextIndex(dpos)); err != nil {
t.Fatal(err)
} else if b, err := io.ReadAll(lz4.NewReader(r)); err != nil {
t.Fatal(err)
} else if err := r.Close(); err != nil {
t.Fatal(err)
} else if len(b) == db.PageSize() {
t.Fatalf("wal mismatch: len(%d), len(%d)", len(b), db.PageSize())
}
// Reset WAL so the next write will only write out the segment we are checking.
if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
}
// Execute a query to write something into the truncated WAL.
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
t.Fatal(err)
}
// Sync database to catch up the shadow WAL.
if err := db.Sync(context.Background()); err != nil {
t.Fatal(err)
}
// Save position after sync, it should be after our write.
dpos, err = db.Pos()
if err != nil {
t.Fatal(err)
}
// Sync WAL segment out to replica.
if err := r.Sync(context.Background()); err != nil {
t.Fatal(err)
}
// Verify WAL matches replica WAL.
if b0, err := os.ReadFile(db.Path() + "-wal"); err != nil {
t.Fatal(err)
} else if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: generations[0], Index: 0, Offset: 0}); err != nil {
} else if r, err := c.WALSegmentReader(context.Background(), dpos.Truncate()); err != nil {
t.Fatal(err)
} else if b1, err := io.ReadAll(lz4.NewReader(r)); err != nil {
t.Fatal(err)
@@ -131,7 +174,7 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatalf("pos=%v, want %v", got, want)
}
// Verify two snapshots exist.
// Verify snapshots exist.
if infos, err := r.Snapshots(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := len(infos), 2; got != want {

View File

@@ -161,7 +161,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
for _, prefix := range page.CommonPrefixes {
name := path.Base(*prefix.Prefix)
name := path.Base(aws.StringValue(prefix.Prefix))
if !litestream.IsGenerationName(name) {
continue
}
@@ -292,7 +292,7 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
return nil, err
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(*out.ContentLength))
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(aws.Int64Value(out.ContentLength)))
return out.Body, nil
}
@@ -386,7 +386,7 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos
return nil, err
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(*out.ContentLength))
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(aws.Int64Value(out.ContentLength)))
return out.Body, nil
}
@@ -527,7 +527,7 @@ func (itr *snapshotIterator) fetch() error {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
key := path.Base(aws.StringValue(obj.Key))
index, err := litestream.ParseSnapshotPath(key)
if err != nil {
continue
@@ -536,7 +536,7 @@ func (itr *snapshotIterator) fetch() error {
info := litestream.SnapshotInfo{
Generation: itr.generation,
Index: index,
Size: *obj.Size,
Size: aws.Int64Value(obj.Size),
CreatedAt: obj.LastModified.UTC(),
}
@@ -630,7 +630,7 @@ func (itr *walSegmentIterator) fetch() error {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
key := path.Base(aws.StringValue(obj.Key))
index, offset, err := litestream.ParseWALSegmentPath(key)
if err != nil {
continue
@@ -640,7 +640,7 @@ func (itr *walSegmentIterator) fetch() error {
Generation: itr.generation,
Index: index,
Offset: offset,
Size: *obj.Size,
Size: aws.Int64Value(obj.Size),
CreatedAt: obj.LastModified.UTC(),
}
@@ -716,6 +716,9 @@ func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool)
} else if a := digitalOceanRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], a[2]
endpoint = fmt.Sprintf("%s.digitaloceanspaces.com", region)
} else if a := scalewayRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], a[2]
endpoint = fmt.Sprintf("s3.%s.scw.cloud", region)
} else if a := linodeRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], a[2]
endpoint = fmt.Sprintf("%s.linodeobjects.com", region)
@@ -742,6 +745,7 @@ var (
backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`)
filebaseRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.filebase.com$`)
digitalOceanRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.digitaloceanspaces.com$`)
scalewayRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.scw\.cloud$`)
linodeRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.linodeobjects.com$`)
)
@@ -759,9 +763,9 @@ func deleteOutputError(out *s3.DeleteObjectsOutput) error {
case 0:
return nil
case 1:
return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message)
return fmt.Errorf("deleting object %s: %s - %s", aws.StringValue(out.Errors[0].Key), aws.StringValue(out.Errors[0].Code), aws.StringValue(out.Errors[0].Message))
default:
return fmt.Errorf("%d errors occured deleting objects, %s: %s - (%s (and %d others)",
len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1)
return fmt.Errorf("%d errors occurred deleting objects, %s: %s - (%s (and %d others)",
len(out.Errors), aws.StringValue(out.Errors[0].Key), aws.StringValue(out.Errors[0].Code), aws.StringValue(out.Errors[0].Message), len(out.Errors)-1)
}
}

View File

@@ -8,7 +8,6 @@ import (
"net"
"os"
"path"
"sort"
"sync"
"time"
@@ -141,8 +140,6 @@ func (c *ReplicaClient) Generations(ctx context.Context) (_ []string, err error)
generations = append(generations, name)
}
sort.Strings(generations)
return generations, nil
}
@@ -229,8 +226,6 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit
})
}
sort.Sort(litestream.SnapshotInfoSlice(infos))
return litestream.NewSnapshotInfoSliceIterator(infos), nil
}
@@ -363,8 +358,6 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l
})
}
sort.Sort(litestream.WALSegmentInfoSlice(infos))
return litestream.NewWALSegmentInfoSliceIterator(infos), nil
}