Compare commits

..

1 Commits

Author SHA1 Message Date
Toni Spets
94f0082abd Fix restore -if-replica-exists with multiple replicas 2022-08-29 14:00:20 -06:00
15 changed files with 75 additions and 96 deletions

View File

@@ -1,4 +1,4 @@
FROM golang:1.20.1 as builder
FROM golang:1.17 as builder
WORKDIR /src/litestream
COPY . .
@@ -10,7 +10,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream
FROM alpine:3.17.2
FROM alpine
COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream
ENTRYPOINT ["/usr/local/bin/litestream"]
CMD []

View File

@@ -6,7 +6,7 @@ Litestream
![test](https://github.com/benbjohnson/litestream/workflows/test/badge.svg)
==========
Litestream is a standalone disaster recovery tool for SQLite. It runs as a
Litestream is a standalone streaming replication tool for SQLite. It runs as a
background process and safely replicates changes incrementally to another file
or S3. Litestream only communicates with SQLite through the SQLite API so it
will not corrupt your database.

View File

@@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
"os"
@@ -252,7 +253,7 @@ func readConfigFile(filename string, expandEnv bool) (_ Config, err error) {
// Read configuration.
// Do not return an error if using default path and file is missing.
buf, err := os.ReadFile(filename)
buf, err := ioutil.ReadFile(filename)
if err != nil {
return config, err
}

View File

@@ -3,6 +3,7 @@ package main_test
import (
"bytes"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
@@ -22,7 +23,7 @@ func TestReadConfigFile(t *testing.T) {
// Ensure global AWS settings are propagated down to replica configurations.
t.Run("PropagateGlobalSettings", func(t *testing.T) {
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := os.WriteFile(filename, []byte(`
if err := ioutil.WriteFile(filename, []byte(`
access-key-id: XXX
secret-access-key: YYY
@@ -54,7 +55,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_1872363", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := os.WriteFile(filename, []byte(`
if err := ioutil.WriteFile(filename, []byte(`
dbs:
- path: $LITESTREAM_TEST_0129380
replicas:
@@ -81,7 +82,7 @@ dbs:
os.Setenv("LITESTREAM_TEST_9847533", "s3://foo/bar")
filename := filepath.Join(t.TempDir(), "litestream.yml")
if err := os.WriteFile(filename, []byte(`
if err := ioutil.WriteFile(filename, []byte(`
dbs:
- path: /path/to/db
replicas:

View File

@@ -111,8 +111,16 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
// Build replica from either a URL or config.
r, err := c.loadReplica(ctx, config, pathOrURL)
if err != nil {
return err
if err == litestream.ErrNoGeneration {
// Return an error if no replicas can be loaded to restore from.
// If optional flag set, return success. Useful for automated recovery.
if c.ifReplicaExists {
fmt.Fprintln(c.stdout, "no replicas have generations to restore from, skipping")
return nil
}
return fmt.Errorf("no replicas have generations to restore from")
} else if err != nil {
return fmt.Errorf("cannot determine latest replica: %w", err)
}
// Determine latest generation if one is not specified.
@@ -218,11 +226,7 @@ func (c *RestoreCommand) loadReplicaFromConfig(ctx context.Context, config Confi
}
// Determine latest replica to restore from.
r, err := litestream.LatestReplica(ctx, db.Replicas)
if err != nil {
return nil, fmt.Errorf("cannot determine latest replica: %w", err)
}
return r, nil
return litestream.LatestReplica(ctx, db.Replicas)
}
// Usage prints the help screen to STDOUT.

View File

@@ -138,6 +138,19 @@ func TestRestoreCommand(t *testing.T) {
}
})
t.Run("IfReplicaExists/Multiple", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "if-replica-exists-flag-multiple")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()
m, _, stdout, _ := newMain()
err := m.Run(context.Background(), []string{"restore", "-config", filepath.Join(testDir, "litestream.yml"), "-if-replica-exists", filepath.Join(testDir, "db")})
if err != nil {
t.Fatal(err)
} else if got, want := stdout.String(), string(testingutil.ReadFile(t, filepath.Join(testDir, "stdout"))); got != want {
t.Fatalf("stdout=%q, want %q", got, want)
}
})
t.Run("ErrNoBackups", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "no-backups")
defer testingutil.Setenv(t, "LITESTREAM_TESTDIR", testDir)()

View File

@@ -0,0 +1,5 @@
dbs:
- path: $LITESTREAM_TESTDIR/db
replicas:
- path: $LITESTREAM_TESTDIR/replica0
- path: $LITESTREAM_TESTDIR/replica1

View File

@@ -0,0 +1 @@
no replicas have generations to restore from, skipping

5
db.go
View File

@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"os"
@@ -291,7 +292,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
}
defer rd.Close()
n, err := io.Copy(io.Discard, lz4.NewReader(rd))
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
if err != nil {
return err
}
@@ -670,7 +671,7 @@ func (db *DB) cleanGenerations(ctx context.Context) error {
}
dir := filepath.Join(db.MetaPath(), "generations")
fis, err := os.ReadDir(dir)
fis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) {
return nil
} else if err != nil {

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
@@ -110,7 +111,7 @@ func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error) {
return nil, fmt.Errorf("cannot determine generations path: %w", err)
}
fis, err := os.ReadDir(root)
fis, err := ioutil.ReadDir(root)
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {

40
go.mod
View File

@@ -1,51 +1,27 @@
module github.com/benbjohnson/litestream
go 1.19
go 1.16
require (
cloud.google.com/go v0.103.0 // indirect
cloud.google.com/go/storage v1.24.0
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aws/aws-sdk-go v1.44.71
github.com/fsnotify/fsnotify v1.5.4
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/mattn/go-ieproxy v0.0.7 // indirect
github.com/mattn/go-shellwords v1.0.12
github.com/mattn/go-sqlite3 v1.14.14
github.com/pierrec/lz4/v4 v4.1.15
github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.13.0
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
google.golang.org/api v0.91.0
gopkg.in/yaml.v2 v2.4.0
)
require (
cloud.google.com/go v0.103.0 // indirect
cloud.google.com/go/compute v1.7.0 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-ieproxy v0.0.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 // indirect
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/api v0.91.0
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.4.0
)

4
go.sum
View File

@@ -93,6 +93,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@@ -547,6 +548,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 h1:v1W7bwXHsnLLloWYTVEdvGvA7BHMeBYsPcF0GLDxIRs=
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -665,6 +667,7 @@ google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g=
google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/api v0.90.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/api v0.91.0 h1:731+JzuwaJoZXRQGmPoBiV+SrsAfUaIkdMCWTcQNPyA=
google.golang.org/api v0.91.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
@@ -756,6 +759,7 @@ google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljW
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ=
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=

View File

@@ -4,7 +4,7 @@ import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
@@ -22,13 +22,12 @@ import (
)
func init() {
localRand = rand.New(rand.NewSource(time.Now().UnixNano()))
rand.Seed(time.Now().UnixNano())
}
var (
// Enables integration tests.
replicaType = flag.String("replica-type", "file", "")
localRand *rand.Rand
)
// S3 settings
@@ -194,7 +193,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) {
if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil {
t.Fatal(err)
} else if buf, err := io.ReadAll(r); err != nil {
} else if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err)
} else if err := r.Close(); err != nil {
t.Fatal(err)
@@ -225,7 +224,7 @@ func TestReplicaClient_SnapshotReader(t *testing.T) {
}
defer r.Close()
if buf, err := io.ReadAll(r); err != nil {
if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err)
} else if got, want := string(buf), "foo"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want)
@@ -379,7 +378,7 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) {
if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil {
t.Fatal(err)
} else if buf, err := io.ReadAll(r); err != nil {
} else if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err)
} else if err := r.Close(); err != nil {
t.Fatal(err)
@@ -410,7 +409,7 @@ func TestReplicaClient_WALSegmentReader(t *testing.T) {
}
defer r.Close()
if buf, err := io.ReadAll(r); err != nil {
if buf, err := ioutil.ReadAll(r); err != nil {
t.Fatal(err)
} else if got, want := string(buf), "foobar"; got != want {
t.Fatalf("ReadAll=%v, want %v", got, want)

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"sort"
@@ -374,7 +375,7 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err
}
defer rd.Close()
n, err := io.Copy(io.Discard, lz4.NewReader(rd))
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
if err != nil {
return pos, err
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -94,7 +95,6 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
if region != "" {
config.Region = aws.String(region)
}
sess, err := session.NewSession(config)
if err != nil {
return fmt.Errorf("cannot create aws session: %w", err)
@@ -107,8 +107,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
// config returns the AWS configuration. Uses the default credential chain
// unless a key/secret are explicitly set.
func (c *ReplicaClient) config() *aws.Config {
config := &aws.Config{}
config := defaults.Get().Config
if c.AccessKeyID != "" || c.SecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "")
}
@@ -208,14 +207,10 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
n = len(objIDs)
}
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
})
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
}); err != nil {
return err
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -302,14 +297,10 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)},
})
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
}); err != nil {
return err
}
@@ -406,16 +397,13 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
}
// Delete S3 objects in bulk.
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
})
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
}); err != nil {
return err
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
a = a[n:]
@@ -458,14 +446,10 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
n = len(objIDs)
}
out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket),
Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
})
if err != nil {
return err
}
if err := deleteOutputError(out); err != nil {
}); err != nil {
return err
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -765,15 +749,3 @@ func isNotExists(err error) bool {
return false
}
}
func deleteOutputError(out *s3.DeleteObjectsOutput) error {
switch len(out.Errors) {
case 0:
return nil
case 1:
return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message)
default:
return fmt.Errorf("%d errors occured deleting objects, %s: %s - (%s (and %d others)",
len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1)
}
}