Compare commits

..

1 Commits

Author SHA1 Message Date
Toni Spets
94f0082abd Fix restore -if-replica-exists with multiple replicas 2022-08-29 14:00:20 -06:00
15 changed files with 75 additions and 96 deletions

View File

@@ -1,4 +1,4 @@
FROM golang:1.20.1 as builder FROM golang:1.17 as builder
WORKDIR /src/litestream WORKDIR /src/litestream
COPY . . COPY . .
@@ -10,7 +10,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream
FROM alpine:3.17.2 FROM alpine
COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream
ENTRYPOINT ["/usr/local/bin/litestream"] ENTRYPOINT ["/usr/local/bin/litestream"]
CMD [] CMD []

View File

@@ -6,7 +6,7 @@ Litestream
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg) ![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
========== ==========
Litestream is a standalone disaster recovery tool for SQLite. It runs as a Litestream is a standalone streaming replication tool for SQLite. It runs as a
background process and safely replicates changes incrementally to another file background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database. will not corrupt your database.

View File

@@ -6,6 +6,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"net/url" "net/url"
"os" "os"
@@ -252,7 +253,7 @@ func readConfigFile(filename string, expandEnv bool) (_ Config, err error) {
// Read configuration. // Read configuration.
// Do not return an error if using default path and file is missing. // Do not return an error if using default path and file is missing.
buf, err := os.ReadFile(filename) buf, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
return config, err return config, err
} }

View File

@@ -3,6 +3,7 @@ package main_test
import ( import (
"bytes" "bytes"
"io" "io"
"io/ioutil"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
@@ -22,7 +23,7 @@ func TestReadConfigFile(t *testing.T) {
// Ensure global AWS settings are propagated down to replica configurations. // Ensure global AWS settings are propagated down to replica configurations.
t.Run("PropagateGlobalSettings", func(t *testing.T) { t.Run("PropagateGlobalSettings", func(t *testing.T) {
filename := filepath.Join(t.TempDir(), "litestream.yml") filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := os.WriteFile(filename, []byte(` if err := ioutil.WriteFile(filename, []byte(`
access-key-id: XXX access-key-id: XXX
secret-access-key: YYY secret-access-key: YYY
@@ -54,7 +55,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar") os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml") filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := os.WriteFile(filename, []byte(` if err := ioutil.WriteFile(filename, []byte(`
dbs: dbs:
- path: $LITESTREAM_TEST_0129380 - path: $LITESTREAM_TEST_0129380
replicas: replicas:
@@ -81,7 +82,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar") os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml") filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := os.WriteFile(filename, []byte(` if err := ioutil.WriteFile(filename, []byte(`
dbs: dbs:
- path: /path/to/db - path: /path/to/db
replicas: replicas:

View File

@@ -111,8 +111,16 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
// Build replica from either a URL or config. // Build replica from either a URL or config.
r, err := c.loadReplica(ctx, config, pathOrURL) r, err := c.loadReplica(ctx, config, pathOrURL)
if err != nil { if err == litestream.ErrNoGeneration {
return err // Return an error if no replicas can be loaded to restore from.
// If optional flag set, return success. Useful for automated recovery.
if c.ifReplicaExists {
fmt.Fprintln(c.stdout, "no replicas have generations to restore from, skipping")
return nil
}
return fmt.Errorf("no replicas have generations to restore from")
} else if err != nil {
return fmt.Errorf("cannot determine latest replica: %w", err)
} }
// Determine latest generation if one is not specified. // Determine latest generation if one is not specified.
@@ -218,11 +226,7 @@ func (c *RestoreCommand) loadReplicaFromConfig(ctx context.Context, config Confi
} }
// Determine latest replica to restore from. // Determine latest replica to restore from.
r, err := litestream.LatestReplica(ctx, db.Replicas) return litestream.LatestReplica(ctx, db.Replicas)
if err != nil {
return nil, fmt.Errorf("cannot determine latest replica: %w", err)
}
return r, nil
} }
// Usage prints the help screen to STDOUT. // Usage prints the help screen to STDOUT.

View File

@@ -138,6 +138,19 @@ func TestRestoreCommand(t *testing.T) {
} }
}) })
t.Run("IfReplicaExists/Multiple", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "if-replica-exists-flag-multiple")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-if-replica-exists", filepath.Join(testDir, "db")})
if err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ErrNoBackups", func(t *testing.T) { t.Run("ErrNoBackups", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "no-backups") testDir := filepath.Join("testdata", "restore", "no-backups")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)() defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()

View File

@@ -0,0 +1,5 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica0
- path: $LITESTREAM_TESTDIR/replica1

View File

@@ -0,0 +1 @@
no replicas have generations to restore from, skipping

5
db.go
View File

@@ -9,6 +9,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"math/rand" "math/rand"
"os" "os"
@@ -291,7 +292,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
} }
defer rd.Close() defer rd.Close()
n, err := io.Copy(io.Discard, lz4.NewReader(rd)) n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
if err != nil { if err != nil {
return err return err
} }
@@ -670,7 +671,7 @@ func (db *DB) cleanGenerations(ctx context.Context) error {
} }
dir := filepath.Join(db.MetaPath(), "generations") dir := filepath.Join(db.MetaPath(), "generations")
fis, err := os.ReadDir(dir) fis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil return nil
} else if err != nil { } else if err != nil {

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@@ -110,7 +111,7 @@ func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error) {
return nil, fmt.Errorf("cannot determine generations path: %w", err) return nil, fmt.Errorf("cannot determine generations path: %w", err)
} }
fis, err := os.ReadDir(root) fis, err := ioutil.ReadDir(root)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
} else if err != nil { } else if err != nil {

40
go.mod
View File

@@ -1,51 +1,27 @@
module github.com/benbjohnson/litestream module github.com/benbjohnson/litestream
go 1.19 go 1.16
require ( require (
cloud.google.com/go v0.103.0 // indirect
cloud.google.com/go/storage v1.24.0 cloud.google.com/go/storage v1.24.0
github.com/Azure/azure-storage-blob-go v0.15.0 github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aws/aws-sdk-go v1.44.71 github.com/aws/aws-sdk-go v1.44.71
github.com/fsnotify/fsnotify v1.5.4 github.com/fsnotify/fsnotify v1.5.4
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/mattn/go-ieproxy v0.0.7 // indirect
github.com/mattn/go-shellwords v1.0.12 github.com/mattn/go-shellwords v1.0.12
github.com/mattn/go-sqlite3 v1.14.14 github.com/mattn/go-sqlite3 v1.14.14
github.com/pierrec/lz4/v4 v4.1.15 github.com/pierrec/lz4/v4 v4.1.15
github.com/pkg/sftp v1.13.5 github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.13.0 github.com/prometheus/client_golang v1.13.0
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
google.golang.org/api v0.91.0
gopkg.in/yaml.v2 v2.4.0
)
require (
cloud.google.com/go v0.103.0 // indirect
cloud.google.com/go/compute v1.7.0 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-ieproxy v0.0.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
golang.org/x/text v0.3.7 // indirect google.golang.org/api v0.91.0
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
google.golang.org/grpc v1.48.0 // indirect gopkg.in/yaml.v2 v2.4.0
google.golang.org/protobuf v1.28.1 // indirect
) )

4
go.sum
View File

@@ -93,6 +93,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@@ -547,6 +548,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs= golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs=
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -665,6 +667,7 @@ google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o= google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g= google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g=
google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw= google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/api v0.90.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/api v0.91.0 h1:731+JzuwaJoZXRQGmPoBiV+SrsAfUaIkdMCWTcQNPyA= google.golang.org/api v0.91.0 h1:731+JzuwaJoZXRQGmPoBiV+SrsAfUaIkdMCWTcQNPyA=
google.golang.org/api v0.91.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw= google.golang.org/api v0.91.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
@@ -756,6 +759,7 @@ google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljW
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ= google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ=
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk= google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=

View File

@@ -4,7 +4,7 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"io" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"path" "path"
@@ -22,13 +22,12 @@ import (
) )
func init() { func init() {
localRand = rand.New(rand.NewSource(time.Now().UnixNano())) rand.Seed(time.Now().UnixNano())
} }
var ( var (
// Enables integration tests. // Enables integration tests.
replicaType = flag.String("replica-type", "file", "") replicaType = flag.String("replica-type", "file", "")
localRand *rand.Rand
) )
// S3 settings // S3 settings
@@ -194,7 +193,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) {
if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil { if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil {
t.Fatal(err) t.Fatal(err)
} else if buf, err := io.ReadAll(r); err != nil { } else if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := r.Close(); err != nil { } else if err := r.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -225,7 +224,7 @@ func TestReplicaClient_SnapshotReader(t *testing.T) {
} }
defer r.Close() defer r.Close()
if buf, err := io.ReadAll(r); err != nil { if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := string(buf), "foo"; got != want { } else if got, want := string(buf), "foo"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want) t.Fatalf("ReadAll=%v, want %v", got, want)
@@ -379,7 +378,7 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) {
if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil { if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil {
t.Fatal(err) t.Fatal(err)
} else if buf, err := io.ReadAll(r); err != nil { } else if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := r.Close(); err != nil { } else if err := r.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -410,7 +409,7 @@ func TestReplicaClient_WALSegmentReader(t *testing.T) {
} }
defer r.Close() defer r.Close()
if buf, err := io.ReadAll(r); err != nil { if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := string(buf), "foobar"; got != want { } else if got, want := string(buf), "foobar"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want) t.Fatalf("ReadAll=%v, want %v", got, want)

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"os" "os"
"sort" "sort"
@@ -374,7 +375,7 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err
} }
defer rd.Close() defer rd.Close()
n, err := io.Copy(io.Discard, lz4.NewReader(rd)) n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
if err != nil { if err != nil {
return pos, err return pos, err
} }

View File

@@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -94,7 +95,6 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
if region != "" { if region != "" {
config.Region = aws.String(region) config.Region = aws.String(region)
} }
sess, err := session.NewSession(config) sess, err := session.NewSession(config)
if err != nil { if err != nil {
return fmt.Errorf("cannot create aws session: %w", err) return fmt.Errorf("cannot create aws session: %w", err)
@@ -107,8 +107,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
// config returns the AWS configuration. Uses the default credential chain // config returns the AWS configuration. Uses the default credential chain
// unless a key/secret are explicitly set. // unless a key/secret are explicitly set.
func (c *ReplicaClient) config() *aws.Config { func (c *ReplicaClient) config() *aws.Config {
config := &aws.Config{} config := defaults.Get().Config
if c.AccessKeyID != "" || c.SecretAccessKey != "" { if c.AccessKeyID != "" || c.SecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "") config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "")
} }
@@ -208,14 +207,10 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
n = len(objIDs) n = len(objIDs)
} }
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
}) }); err != nil {
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -302,14 +297,10 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)},
}) }); err != nil {
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
@@ -406,16 +397,13 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
} }
// Delete S3 objects in bulk. // Delete S3 objects in bulk.
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
}) }); err != nil {
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
a = a[n:] a = a[n:]
@@ -458,14 +446,10 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
n = len(objIDs) n = len(objIDs)
} }
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
}) }); err != nil {
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -765,15 +749,3 @@ func isNotExists(err error) bool {
return false return false
} }
} }
func deleteOutputError(out *s3.DeleteObjectsOutput) error {
switch len(out.Errors) {
case 0:
return nil
case 1:
return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message)
default:
return fmt.Errorf("%d errors occured deleting objects, %s: %s - (%s (and %d others)",
len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1)
}
}