Compare commits
1 Commits
legacy
...
if-replica
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
94f0082abd |
@@ -1,4 +1,4 @@
|
|||||||
FROM golang:1.20.1 as builder
|
FROM golang:1.17 as builder
|
||||||
|
|
||||||
WORKDIR /src/litestream
|
WORKDIR /src/litestream
|
||||||
COPY . .
|
COPY . .
|
||||||
@@ -10,7 +10,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
|
|||||||
go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream
|
go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream
|
||||||
|
|
||||||
|
|
||||||
FROM alpine:3.17.2
|
FROM alpine
|
||||||
COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream
|
COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream
|
||||||
ENTRYPOINT ["/usr/local/bin/litestream"]
|
ENTRYPOINT ["/usr/local/bin/litestream"]
|
||||||
CMD []
|
CMD []
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ Litestream
|
|||||||

|

|
||||||
==========
|
==========
|
||||||
|
|
||||||
Litestream is a standalone disaster recovery tool for SQLite. It runs as a
|
Litestream is a standalone streaming replication tool for SQLite. It runs as a
|
||||||
background process and safely replicates changes incrementally to another file
|
background process and safely replicates changes incrementally to another file
|
||||||
or S3. Litestream only communicates with SQLite through the SQLite API so it
|
or S3. Litestream only communicates with SQLite through the SQLite API so it
|
||||||
will not corrupt your database.
|
will not corrupt your database.
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
@@ -252,7 +253,7 @@ func readConfigFile(filename string, expandEnv bool) (_ Config, err error) {
|
|||||||
|
|
||||||
// Read configuration.
|
// Read configuration.
|
||||||
// Do not return an error if using default path and file is missing.
|
// Do not return an error if using default path and file is missing.
|
||||||
buf, err := os.ReadFile(filename)
|
buf, err := ioutil.ReadFile(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return config, err
|
return config, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package main_test
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -22,7 +23,7 @@ func TestReadConfigFile(t *testing.T) {
|
|||||||
// Ensure global AWS settings are propagated down to replica configurations.
|
// Ensure global AWS settings are propagated down to replica configurations.
|
||||||
t.Run("PropagateGlobalSettings", func(t *testing.T) {
|
t.Run("PropagateGlobalSettings", func(t *testing.T) {
|
||||||
filename := filepath.Join(t.TempDir(), "litestream.yml")
|
filename := filepath.Join(t.TempDir(), "litestream.yml")
|
||||||
if err := os.WriteFile(filename, []byte(`
|
if err := ioutil.WriteFile(filename, []byte(`
|
||||||
access-key-id: XXX
|
access-key-id: XXX
|
||||||
secret-access-key: YYY
|
secret-access-key: YYY
|
||||||
|
|
||||||
@@ -54,7 +55,7 @@ dbs:
|
|||||||
os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar")
|
os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar")
|
||||||
|
|
||||||
filename := filepath.Join(t.TempDir(), "litestream.yml")
|
filename := filepath.Join(t.TempDir(), "litestream.yml")
|
||||||
if err := os.WriteFile(filename, []byte(`
|
if err := ioutil.WriteFile(filename, []byte(`
|
||||||
dbs:
|
dbs:
|
||||||
- path: $LITESTREAM_TEST_0129380
|
- path: $LITESTREAM_TEST_0129380
|
||||||
replicas:
|
replicas:
|
||||||
@@ -81,7 +82,7 @@ dbs:
|
|||||||
os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar")
|
os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar")
|
||||||
|
|
||||||
filename := filepath.Join(t.TempDir(), "litestream.yml")
|
filename := filepath.Join(t.TempDir(), "litestream.yml")
|
||||||
if err := os.WriteFile(filename, []byte(`
|
if err := ioutil.WriteFile(filename, []byte(`
|
||||||
dbs:
|
dbs:
|
||||||
- path: /path/to/db
|
- path: /path/to/db
|
||||||
replicas:
|
replicas:
|
||||||
|
|||||||
@@ -111,8 +111,16 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
|
|
||||||
// Build replica from either a URL or config.
|
// Build replica from either a URL or config.
|
||||||
r, err := c.loadReplica(ctx, config, pathOrURL)
|
r, err := c.loadReplica(ctx, config, pathOrURL)
|
||||||
if err != nil {
|
if err == litestream.ErrNoGeneration {
|
||||||
return err
|
// Return an error if no replicas can be loaded to restore from.
|
||||||
|
// If optional flag set, return success. Useful for automated recovery.
|
||||||
|
if c.ifReplicaExists {
|
||||||
|
fmt.Fprintln(c.stdout, "no replicas have generations to restore from, skipping")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("no replicas have generations to restore from")
|
||||||
|
} else if err != nil {
|
||||||
|
return fmt.Errorf("cannot determine latest replica: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine latest generation if one is not specified.
|
// Determine latest generation if one is not specified.
|
||||||
@@ -218,11 +226,7 @@ func (c *RestoreCommand) loadReplicaFromConfig(ctx context.Context, config Confi
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Determine latest replica to restore from.
|
// Determine latest replica to restore from.
|
||||||
r, err := litestream.LatestReplica(ctx, db.Replicas)
|
return litestream.LatestReplica(ctx, db.Replicas)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot determine latest replica: %w", err)
|
|
||||||
}
|
|
||||||
return r, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Usage prints the help screen to STDOUT.
|
// Usage prints the help screen to STDOUT.
|
||||||
|
|||||||
@@ -138,6 +138,19 @@ func TestRestoreCommand(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("IfReplicaExists/Multiple", func(t *testing.T) {
|
||||||
|
testDir := filepath.Join("testdata", "restore", "if-replica-exists-flag-multiple")
|
||||||
|
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
|
||||||
|
|
||||||
|
m, _, stdout, _ := newMain()
|
||||||
|
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-if-replica-exists", filepath.Join(testDir, "db")})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
|
||||||
|
t.Fatalf("stdout=%q, want %q", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("ErrNoBackups", func(t *testing.T) {
|
t.Run("ErrNoBackups", func(t *testing.T) {
|
||||||
testDir := filepath.Join("testdata", "restore", "no-backups")
|
testDir := filepath.Join("testdata", "restore", "no-backups")
|
||||||
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
|
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
|
||||||
|
|||||||
5
cmd/litestream/testdata/restore/if-replica-exists-flag-multiple/litestream.yml
vendored
Normal file
5
cmd/litestream/testdata/restore/if-replica-exists-flag-multiple/litestream.yml
vendored
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
dbs:
|
||||||
|
- path: $LITESTREAM_TESTDIR/db
|
||||||
|
replicas:
|
||||||
|
- path: $LITESTREAM_TESTDIR/replica0
|
||||||
|
- path: $LITESTREAM_TESTDIR/replica1
|
||||||
1
cmd/litestream/testdata/restore/if-replica-exists-flag-multiple/stdout
vendored
Normal file
1
cmd/litestream/testdata/restore/if-replica-exists-flag-multiple/stdout
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
no replicas have generations to restore from, skipping
|
||||||
5
db.go
5
db.go
@@ -9,6 +9,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
@@ -291,7 +292,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
defer rd.Close()
|
defer rd.Close()
|
||||||
|
|
||||||
n, err := io.Copy(io.Discard, lz4.NewReader(rd))
|
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -670,7 +671,7 @@ func (db *DB) cleanGenerations(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dir := filepath.Join(db.MetaPath(), "generations")
|
dir := filepath.Join(db.MetaPath(), "generations")
|
||||||
fis, err := os.ReadDir(dir)
|
fis, err := ioutil.ReadDir(dir)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil
|
return nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
@@ -110,7 +111,7 @@ func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error) {
|
|||||||
return nil, fmt.Errorf("cannot determine generations path: %w", err)
|
return nil, fmt.Errorf("cannot determine generations path: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fis, err := os.ReadDir(root)
|
fis, err := ioutil.ReadDir(root)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
|||||||
40
go.mod
40
go.mod
@@ -1,51 +1,27 @@
|
|||||||
module github.com/benbjohnson/litestream
|
module github.com/benbjohnson/litestream
|
||||||
|
|
||||||
go 1.19
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
cloud.google.com/go v0.103.0 // indirect
|
||||||
cloud.google.com/go/storage v1.24.0
|
cloud.google.com/go/storage v1.24.0
|
||||||
github.com/Azure/azure-storage-blob-go v0.15.0
|
github.com/Azure/azure-storage-blob-go v0.15.0
|
||||||
github.com/aws/aws-sdk-go v1.44.71
|
github.com/aws/aws-sdk-go v1.44.71
|
||||||
github.com/fsnotify/fsnotify v1.5.4
|
github.com/fsnotify/fsnotify v1.5.4
|
||||||
|
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||||
|
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
|
||||||
|
github.com/mattn/go-ieproxy v0.0.7 // indirect
|
||||||
github.com/mattn/go-shellwords v1.0.12
|
github.com/mattn/go-shellwords v1.0.12
|
||||||
github.com/mattn/go-sqlite3 v1.14.14
|
github.com/mattn/go-sqlite3 v1.14.14
|
||||||
github.com/pierrec/lz4/v4 v4.1.15
|
github.com/pierrec/lz4/v4 v4.1.15
|
||||||
github.com/pkg/sftp v1.13.5
|
github.com/pkg/sftp v1.13.5
|
||||||
github.com/prometheus/client_golang v1.13.0
|
github.com/prometheus/client_golang v1.13.0
|
||||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
||||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
|
|
||||||
google.golang.org/api v0.91.0
|
|
||||||
gopkg.in/yaml.v2 v2.4.0
|
|
||||||
)
|
|
||||||
|
|
||||||
require (
|
|
||||||
cloud.google.com/go v0.103.0 // indirect
|
|
||||||
cloud.google.com/go/compute v1.7.0 // indirect
|
|
||||||
cloud.google.com/go/iam v0.3.0 // indirect
|
|
||||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
|
||||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
|
||||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
|
||||||
github.com/golang/protobuf v1.5.2 // indirect
|
|
||||||
github.com/google/go-cmp v0.5.8 // indirect
|
|
||||||
github.com/google/uuid v1.3.0 // indirect
|
|
||||||
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
|
|
||||||
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
|
|
||||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
|
||||||
github.com/kr/fs v0.1.0 // indirect
|
|
||||||
github.com/mattn/go-ieproxy v0.0.7 // indirect
|
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
|
||||||
github.com/prometheus/client_model v0.2.0 // indirect
|
|
||||||
github.com/prometheus/common v0.37.0 // indirect
|
|
||||||
github.com/prometheus/procfs v0.8.0 // indirect
|
|
||||||
go.opencensus.io v0.23.0 // indirect
|
|
||||||
golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect
|
golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect
|
||||||
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
|
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
|
||||||
|
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
|
||||||
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
|
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
|
||||||
golang.org/x/text v0.3.7 // indirect
|
google.golang.org/api v0.91.0
|
||||||
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
|
|
||||||
google.golang.org/appengine v1.6.7 // indirect
|
|
||||||
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
|
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
|
||||||
google.golang.org/grpc v1.48.0 // indirect
|
gopkg.in/yaml.v2 v2.4.0
|
||||||
google.golang.org/protobuf v1.28.1 // indirect
|
|
||||||
)
|
)
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -93,6 +93,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
|
|||||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||||
|
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
||||||
@@ -547,6 +548,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||||||
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs=
|
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs=
|
||||||
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
@@ -665,6 +667,7 @@ google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3
|
|||||||
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
|
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
|
||||||
google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g=
|
google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g=
|
||||||
google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
|
google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
|
||||||
|
google.golang.org/api v0.90.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
|
||||||
google.golang.org/api v0.91.0 h1:731+JzuwaJoZXRQGmPoBiV+SrsAfUaIkdMCWTcQNPyA=
|
google.golang.org/api v0.91.0 h1:731+JzuwaJoZXRQGmPoBiV+SrsAfUaIkdMCWTcQNPyA=
|
||||||
google.golang.org/api v0.91.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
|
google.golang.org/api v0.91.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
|
||||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||||
@@ -756,6 +759,7 @@ google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljW
|
|||||||
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
||||||
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
||||||
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
|
||||||
|
google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
|
||||||
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ=
|
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ=
|
||||||
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=
|
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=
|
||||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
@@ -22,13 +22,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
localRand = rand.New(rand.NewSource(time.Now().UnixNano()))
|
rand.Seed(time.Now().UnixNano())
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// Enables integration tests.
|
// Enables integration tests.
|
||||||
replicaType = flag.String("replica-type", "file", "")
|
replicaType = flag.String("replica-type", "file", "")
|
||||||
localRand *rand.Rand
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// S3 settings
|
// S3 settings
|
||||||
@@ -194,7 +193,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil {
|
if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if buf, err := io.ReadAll(r); err != nil {
|
} else if buf, err := ioutil.ReadAll(r); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if err := r.Close(); err != nil {
|
} else if err := r.Close(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -225,7 +224,7 @@ func TestReplicaClient_SnapshotReader(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
if buf, err := io.ReadAll(r); err != nil {
|
if buf, err := ioutil.ReadAll(r); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if got, want := string(buf), "foo"; got != want {
|
} else if got, want := string(buf), "foo"; got != want {
|
||||||
t.Fatalf("ReadAll=%v, want %v", got, want)
|
t.Fatalf("ReadAll=%v, want %v", got, want)
|
||||||
@@ -379,7 +378,7 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) {
|
|||||||
|
|
||||||
if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil {
|
if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if buf, err := io.ReadAll(r); err != nil {
|
} else if buf, err := ioutil.ReadAll(r); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if err := r.Close(); err != nil {
|
} else if err := r.Close(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -410,7 +409,7 @@ func TestReplicaClient_WALSegmentReader(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
if buf, err := io.ReadAll(r); err != nil {
|
if buf, err := ioutil.ReadAll(r); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if got, want := string(buf), "foobar"; got != want {
|
} else if got, want := string(buf), "foobar"; got != want {
|
||||||
t.Fatalf("ReadAll=%v, want %v", got, want)
|
t.Fatalf("ReadAll=%v, want %v", got, want)
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
@@ -374,7 +375,7 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err
|
|||||||
}
|
}
|
||||||
defer rd.Close()
|
defer rd.Close()
|
||||||
|
|
||||||
n, err := io.Copy(io.Discard, lz4.NewReader(rd))
|
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pos, err
|
return pos, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/defaults"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
@@ -94,7 +95,6 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
|
|||||||
if region != "" {
|
if region != "" {
|
||||||
config.Region = aws.String(region)
|
config.Region = aws.String(region)
|
||||||
}
|
}
|
||||||
|
|
||||||
sess, err := session.NewSession(config)
|
sess, err := session.NewSession(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot create aws session: %w", err)
|
return fmt.Errorf("cannot create aws session: %w", err)
|
||||||
@@ -107,8 +107,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
|
|||||||
// config returns the AWS configuration. Uses the default credential chain
|
// config returns the AWS configuration. Uses the default credential chain
|
||||||
// unless a key/secret are explicitly set.
|
// unless a key/secret are explicitly set.
|
||||||
func (c *ReplicaClient) config() *aws.Config {
|
func (c *ReplicaClient) config() *aws.Config {
|
||||||
config := &aws.Config{}
|
config := defaults.Get().Config
|
||||||
|
|
||||||
if c.AccessKeyID != "" || c.SecretAccessKey != "" {
|
if c.AccessKeyID != "" || c.SecretAccessKey != "" {
|
||||||
config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "")
|
config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "")
|
||||||
}
|
}
|
||||||
@@ -208,14 +207,10 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
|
|||||||
n = len(objIDs)
|
n = len(objIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
||||||
Bucket: aws.String(c.Bucket),
|
Bucket: aws.String(c.Bucket),
|
||||||
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
||||||
})
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := deleteOutputError(out); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
||||||
@@ -302,14 +297,10 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
|
|||||||
|
|
||||||
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
|
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
|
||||||
|
|
||||||
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
||||||
Bucket: aws.String(c.Bucket),
|
Bucket: aws.String(c.Bucket),
|
||||||
Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)},
|
Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)},
|
||||||
})
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := deleteOutputError(out); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -406,16 +397,13 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete S3 objects in bulk.
|
// Delete S3 objects in bulk.
|
||||||
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
||||||
Bucket: aws.String(c.Bucket),
|
Bucket: aws.String(c.Bucket),
|
||||||
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
||||||
})
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := deleteOutputError(out); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
||||||
|
|
||||||
a = a[n:]
|
a = a[n:]
|
||||||
@@ -458,14 +446,10 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
|
|||||||
n = len(objIDs)
|
n = len(objIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
||||||
Bucket: aws.String(c.Bucket),
|
Bucket: aws.String(c.Bucket),
|
||||||
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
||||||
})
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := deleteOutputError(out); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
||||||
@@ -765,15 +749,3 @@ func isNotExists(err error) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteOutputError(out *s3.DeleteObjectsOutput) error {
|
|
||||||
switch len(out.Errors) {
|
|
||||||
case 0:
|
|
||||||
return nil
|
|
||||||
case 1:
|
|
||||||
return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("%d errors occured deleting objects, %s: %s - (%s (and %d others)",
|
|
||||||
len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user