Compare commits

..

6 Commits

Author SHA1 Message Date
Tyler Davis
91e57a6156 fix: remove deprecated ioutil and rand calls
- Change ioutil to io and os calls per Go 1.16 deprecation
- Move global random to localized random source
2023-03-11 19:16:12 -07:00
Tyler Davis
6bbced3d46 fix: update go versions in mod and docker
- Update Go module to v1.19 format
- Docker builder pinned to Go v1.20.1
- Alpine image pinned to 3.17.2 (rather than `latest`)
2023-03-11 19:16:12 -07:00
Erik Kristensen
8dbdf2b91c fix: remove debug code 2023-01-17 18:47:29 -07:00
Erik Kristensen
530930465d fix: aws credential chain by using aws.Config 2023-01-17 18:47:29 -07:00
Lincoln Stoll
64e535e23b Handle errors when deleting objects from S3
I recently noticed that the cost for ListBucket calls was increasing for an
application that was using Litestream. After investigating it seemed that the
bucket had retained the entire history of data, while Litestream was
continually logging that it was deleting the same data:

```
2022-10-30T12:00:27Z (s3): wal segmented deleted before 0792d3393bf79ced/00000233: n=1428
<snip>
2022-10-30T13:00:24Z (s3): wal segmented deleted before 0792d3393bf79ced/00000233: n=1428
```

This is occuring because the DeleteObjects call is a batch item, that returns
the individual object deletion errors in the response[1]. The S3 replica client
discards the response, and only handles errors in the original API call. I had
a misconfigured IAM policy that meant all deletes were failing, but this never
actually bubbled up as a real error.

To fix this, I added a check for the response body to handle any errors the
operation might have encountered. Because this may include a large number of
errors (in this case 1428 of them), the output is summarized to avoid an overly
large error message. When items are not found, they will not return an error[2]
- they will still be marked as deleted, so this change should be in-line with
the original intentions of this code.

1: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Example_2
2: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
2022-11-03 10:02:42 -06:00
Jose Diaz-Gonzalez
f50f03d8fc Update readme to note that this tool is for disaster recovery, not streaming replication
Refs #411
2022-10-14 15:12:58 -06:00
15 changed files with 97 additions and 76 deletions

View File

@@ -1,4 +1,4 @@
FROM golang:1.17 as builder FROM golang:1.20.1 as builder
WORKDIR /src/litestream WORKDIR /src/litestream
COPY . . COPY . .
@@ -10,7 +10,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream
FROM alpine FROM alpine:3.17.2
COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream
ENTRYPOINT ["/usr/local/bin/litestream"] ENTRYPOINT ["/usr/local/bin/litestream"]
CMD [] CMD []

View File

@@ -6,7 +6,7 @@ Litestream
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg) ![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
========== ==========
Litestream is a standalone streaming replication tool for SQLite. It runs as a Litestream is a standalone disaster recovery tool for SQLite. It runs as a
background process and safely replicates changes incrementally to another file background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database. will not corrupt your database.

View File

@@ -6,7 +6,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"net/url" "net/url"
"os" "os"
@@ -253,7 +252,7 @@ func readConfigFile(filename string, expandEnv bool) (_ Config, err error) {
// Read configuration. // Read configuration.
// Do not return an error if using default path and file is missing. // Do not return an error if using default path and file is missing.
buf, err := ioutil.ReadFile(filename) buf, err := os.ReadFile(filename)
if err != nil { if err != nil {
return config, err return config, err
} }

View File

@@ -3,7 +3,6 @@ package main_test
import ( import (
"bytes" "bytes"
"io" "io"
"io/ioutil"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
@@ -23,7 +22,7 @@ func TestReadConfigFile(t *testing.T) {
// Ensure global AWS settings are propagated down to replica configurations. // Ensure global AWS settings are propagated down to replica configurations.
t.Run("PropagateGlobalSettings", func(t *testing.T) { t.Run("PropagateGlobalSettings", func(t *testing.T) {
filename := filepath.Join(t.TempDir(), "litestream.yml") filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(` if err := os.WriteFile(filename, []byte(`
access-key-id: XXX access-key-id: XXX
secret-access-key: YYY secret-access-key: YYY
@@ -55,7 +54,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar") os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml") filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(` if err := os.WriteFile(filename, []byte(`
dbs: dbs:
- path: $LITESTREAM_TEST_0129380 - path: $LITESTREAM_TEST_0129380
replicas: replicas:
@@ -82,7 +81,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar") os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml") filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := ioutil.WriteFile(filename, []byte(` if err := os.WriteFile(filename, []byte(`
dbs: dbs:
- path: /path/to/db - path: /path/to/db
replicas: replicas:

View File

@@ -111,16 +111,8 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
// Build replica from either a URL or config. // Build replica from either a URL or config.
r, err := c.loadReplica(ctx, config, pathOrURL) r, err := c.loadReplica(ctx, config, pathOrURL)
if err == litestream.ErrNoGeneration { if err != nil {
// Return an error if no replicas can be loaded to restore from. return err
// If optional flag set, return success. Useful for automated recovery.
if c.ifReplicaExists {
fmt.Fprintln(c.stdout, "no replicas have generations to restore from, skipping")
return nil
}
return fmt.Errorf("no replicas have generations to restore from")
} else if err != nil {
return fmt.Errorf("cannot determine latest replica: %w", err)
} }
// Determine latest generation if one is not specified. // Determine latest generation if one is not specified.
@@ -226,7 +218,11 @@ func (c *RestoreCommand) loadReplicaFromConfig(ctx context.Context, config Confi
} }
// Determine latest replica to restore from. // Determine latest replica to restore from.
return litestream.LatestReplica(ctx, db.Replicas) r, err := litestream.LatestReplica(ctx, db.Replicas)
if err != nil {
return nil, fmt.Errorf("cannot determine latest replica: %w", err)
}
return r, nil
} }
// Usage prints the help screen to STDOUT. // Usage prints the help screen to STDOUT.

View File

@@ -138,19 +138,6 @@ func TestRestoreCommand(t *testing.T) {
} }
}) })
t.Run("IfReplicaExists/Multiple", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "if-replica-exists-flag-multiple")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-if-replica-exists", filepath.Join(testDir, "db")})
if err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ErrNoBackups", func(t *testing.T) { t.Run("ErrNoBackups", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "no-backups") testDir := filepath.Join("testdata", "restore", "no-backups")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)() defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()

View File

@@ -1,5 +0,0 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica0
- path: $LITESTREAM_TESTDIR/replica1

View File

@@ -1 +0,0 @@
no replicas have generations to restore from, skipping

5
db.go
View File

@@ -9,7 +9,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"math/rand" "math/rand"
"os" "os"
@@ -292,7 +291,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
} }
defer rd.Close() defer rd.Close()
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd)) n, err := io.Copy(io.Discard, lz4.NewReader(rd))
if err != nil { if err != nil {
return err return err
} }
@@ -671,7 +670,7 @@ func (db *DB) cleanGenerations(ctx context.Context) error {
} }
dir := filepath.Join(db.MetaPath(), "generations") dir := filepath.Join(db.MetaPath(), "generations")
fis, err := ioutil.ReadDir(dir) fis, err := os.ReadDir(dir)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil return nil
} else if err != nil { } else if err != nil {

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@@ -111,7 +110,7 @@ func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error) {
return nil, fmt.Errorf("cannot determine generations path: %w", err) return nil, fmt.Errorf("cannot determine generations path: %w", err)
} }
fis, err := ioutil.ReadDir(root) fis, err := os.ReadDir(root)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
} else if err != nil { } else if err != nil {

42
go.mod
View File

@@ -1,27 +1,51 @@
module github.com/benbjohnson/litestream module github.com/benbjohnson/litestream
go 1.16 go 1.19
require ( require (
cloud.google.com/go v0.103.0 // indirect
cloud.google.com/go/storage v1.24.0 cloud.google.com/go/storage v1.24.0
github.com/Azure/azure-storage-blob-go v0.15.0 github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aws/aws-sdk-go v1.44.71 github.com/aws/aws-sdk-go v1.44.71
github.com/fsnotify/fsnotify v1.5.4 github.com/fsnotify/fsnotify v1.5.4
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/mattn/go-ieproxy v0.0.7 // indirect
github.com/mattn/go-shellwords v1.0.12 github.com/mattn/go-shellwords v1.0.12
github.com/mattn/go-sqlite3 v1.14.14 github.com/mattn/go-sqlite3 v1.14.14
github.com/pierrec/lz4/v4 v4.1.15 github.com/pierrec/lz4/v4 v4.1.15
github.com/pkg/sftp v1.13.5 github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.13.0 github.com/prometheus/client_golang v1.13.0
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
google.golang.org/api v0.91.0 google.golang.org/api v0.91.0
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
) )
require (
cloud.google.com/go v0.103.0 // indirect
cloud.google.com/go/compute v1.7.0 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-ieproxy v0.0.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)

4
go.sum
View File

@@ -93,7 +93,6 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@@ -548,7 +547,6 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs= golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs=
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -667,7 +665,6 @@ google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o= google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g= google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g=
google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw= google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/api v0.90.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/api v0.91.0 h1:731+JzuwaJoZXRQGmPoBiV+SrsAfUaIkdMCWTcQNPyA= google.golang.org/api v0.91.0 h1:731+JzuwaJoZXRQGmPoBiV+SrsAfUaIkdMCWTcQNPyA=
google.golang.org/api v0.91.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw= google.golang.org/api v0.91.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
@@ -759,7 +756,6 @@ google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljW
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ= google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ=
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk= google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=

View File

@@ -4,7 +4,7 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io"
"math/rand" "math/rand"
"os" "os"
"path" "path"
@@ -22,12 +22,13 @@ import (
) )
func init() { func init() {
rand.Seed(time.Now().UnixNano()) localRand = rand.New(rand.NewSource(time.Now().UnixNano()))
} }
var ( var (
// Enables integration tests. // Enables integration tests.
replicaType = flag.String("replica-type", "file", "") replicaType = flag.String("replica-type", "file", "")
localRand *rand.Rand
) )
// S3 settings // S3 settings
@@ -193,7 +194,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) {
if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil { if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil {
t.Fatal(err) t.Fatal(err)
} else if buf, err := ioutil.ReadAll(r); err != nil { } else if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := r.Close(); err != nil { } else if err := r.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -224,7 +225,7 @@ func TestReplicaClient_SnapshotReader(t *testing.T) {
} }
defer r.Close() defer r.Close()
if buf, err := ioutil.ReadAll(r); err != nil { if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := string(buf), "foo"; got != want { } else if got, want := string(buf), "foo"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want) t.Fatalf("ReadAll=%v, want %v", got, want)
@@ -378,7 +379,7 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) {
if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil { if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil {
t.Fatal(err) t.Fatal(err)
} else if buf, err := ioutil.ReadAll(r); err != nil { } else if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := r.Close(); err != nil { } else if err := r.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -409,7 +410,7 @@ func TestReplicaClient_WALSegmentReader(t *testing.T) {
} }
defer r.Close() defer r.Close()
if buf, err := ioutil.ReadAll(r); err != nil { if buf, err := io.ReadAll(r); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := string(buf), "foobar"; got != want { } else if got, want := string(buf), "foobar"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want) t.Fatalf("ReadAll=%v, want %v", got, want)

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"os" "os"
"sort" "sort"
@@ -375,7 +374,7 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err
} }
defer rd.Close() defer rd.Close()
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd)) n, err := io.Copy(io.Discard, lz4.NewReader(rd))
if err != nil { if err != nil {
return pos, err return pos, err
} }

View File

@@ -18,7 +18,6 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -95,6 +94,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
if region != "" { if region != "" {
config.Region = aws.String(region) config.Region = aws.String(region)
} }
sess, err := session.NewSession(config) sess, err := session.NewSession(config)
if err != nil { if err != nil {
return fmt.Errorf("cannot create aws session: %w", err) return fmt.Errorf("cannot create aws session: %w", err)
@@ -107,7 +107,8 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
// config returns the AWS configuration. Uses the default credential chain // config returns the AWS configuration. Uses the default credential chain
// unless a key/secret are explicitly set. // unless a key/secret are explicitly set.
func (c *ReplicaClient) config() *aws.Config { func (c *ReplicaClient) config() *aws.Config {
config := defaults.Get().Config config := &aws.Config{}
if c.AccessKeyID != "" || c.SecretAccessKey != "" { if c.AccessKeyID != "" || c.SecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "") config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "")
} }
@@ -207,10 +208,14 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
n = len(objIDs) n = len(objIDs)
} }
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
}); err != nil { })
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -297,10 +302,14 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)},
}); err != nil { })
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
@@ -397,13 +406,16 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
} }
// Delete S3 objects in bulk. // Delete S3 objects in bulk.
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
}); err != nil { })
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
a = a[n:] a = a[n:]
@@ -446,10 +458,14 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
n = len(objIDs) n = len(objIDs)
} }
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
}); err != nil { })
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
return err return err
} }
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -749,3 +765,15 @@ func isNotExists(err error) bool {
return false return false
} }
} }
func deleteOutputError(out *s3.DeleteObjectsOutput) error {
switch len(out.Errors) {
case 0:
return nil
case 1:
return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message)
default:
return fmt.Errorf("%d errors occured deleting objects, %s: %s - (%s (and %d others)",
len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1)
}
}