From a42f83f3cbee1d2ae4b2fb5e969fbbc7a026bf7d Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 13 Jan 2021 13:17:38 -0700 Subject: [PATCH] Add LITESTREAM_CONFIG env var --- cmd/litestream/databases.go | 2 +- cmd/litestream/generations.go | 2 +- cmd/litestream/main.go | 16 ++-- cmd/litestream/replicate.go | 2 +- cmd/litestream/restore.go | 2 +- cmd/litestream/snapshots.go | 2 +- cmd/litestream/validate.go | 2 +- cmd/litestream/wal.go | 2 +- db.go | 4 +- internal/internal.go | 32 ++++++++ litestream.go | 19 ----- replica.go | 139 +++++++++++++++------------------- s3/s3.go | 78 +------------------ 13 files changed, 117 insertions(+), 185 deletions(-) create mode 100644 internal/internal.go diff --git a/cmd/litestream/databases.go b/cmd/litestream/databases.go index b43d174..e80bb7f 100644 --- a/cmd/litestream/databases.go +++ b/cmd/litestream/databases.go @@ -71,6 +71,6 @@ Arguments: Defaults to %s `[1:], - DefaultConfigPath, + DefaultConfigPath(), ) } diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index c3c04d3..d90f78f 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -113,7 +113,7 @@ Arguments: Optional, filters by replica. `[1:], - DefaultConfigPath, + DefaultConfigPath(), ) } diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 6a6f6a8..839d76c 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -22,9 +22,6 @@ var ( Version = "(development build)" ) -// DefaultConfigPath is the default configuration path. -const DefaultConfigPath = "/etc/litestream.yml" - func main() { log.SetFlags(0) @@ -165,11 +162,20 @@ type ReplicaConfig struct { // S3 settings AccessKeyID string `yaml:"access-key-id"` SecretAccessKey string `yaml:"secret-access-key"` + Region string `yaml:"region"` Bucket string `yaml:"bucket"` } +// DefaultConfigPath returns the default config path. +func DefaultConfigPath() string { + if v := os.Getenv("LITESTREAM_CONFIG"); v != "" { + return v + } + return "/etc/litestream.yml" +} + func registerConfigFlag(fs *flag.FlagSet, p *string) { - fs.StringVar(p, "config", DefaultConfigPath, "config path") + fs.StringVar(p, "config", DefaultConfigPath(), "config path") } // newDBFromConfig instantiates a DB based on a configuration. @@ -226,7 +232,7 @@ func newS3ReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*s3.Repli return nil, fmt.Errorf("%s: s3 bucket required", db.Path()) } - r := aws.NewS3Replica(db, config.Name) + r := s3.NewReplica(db, config.Name) r.AccessKeyID = config.AccessKeyID r.SecretAccessKey = config.SecretAccessKey r.Region = config.Region diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 6dfb9a1..f47c2d7 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -130,5 +130,5 @@ Arguments: -v Enable verbose logging output. -`[1:], DefaultConfigPath) +`[1:], DefaultConfigPath()) } diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 1387e93..c7df93f 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -142,6 +142,6 @@ Examples: $ litestream restore -replica s3 -generation xxxxxxxx /path/to/db `[1:], - DefaultConfigPath, + DefaultConfigPath(), ) } diff --git a/cmd/litestream/snapshots.go b/cmd/litestream/snapshots.go index 7a3f4d6..b6d8c17 100644 --- a/cmd/litestream/snapshots.go +++ b/cmd/litestream/snapshots.go @@ -112,6 +112,6 @@ Examples: $ litestream snapshots -replica s3 /path/to/db `[1:], - DefaultConfigPath, + DefaultConfigPath(), ) } diff --git a/cmd/litestream/validate.go b/cmd/litestream/validate.go index 9dde971..2812711 100644 --- a/cmd/litestream/validate.go +++ b/cmd/litestream/validate.go @@ -131,6 +131,6 @@ Examples: $ litestream restore -replica s3 /path/to/db `[1:], - DefaultConfigPath, + DefaultConfigPath(), ) } diff --git a/cmd/litestream/wal.go b/cmd/litestream/wal.go index 16dd23f..b93e540 100644 --- a/cmd/litestream/wal.go +++ b/cmd/litestream/wal.go @@ -121,6 +121,6 @@ Examples: $ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db `[1:], - DefaultConfigPath, + DefaultConfigPath(), ) } diff --git a/db.go b/db.go index 20ab26b..81da17a 100644 --- a/db.go +++ b/db.go @@ -1298,13 +1298,13 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { } // Find lastest snapshot that occurs before timestamp. - minWALIndex, err := r.SnapshotIndexAt(ctx, generation, opt.Timestamp) + minWALIndex, err := SnapshotIndexAt(ctx, r, generation, opt.Timestamp) if err != nil { return fmt.Errorf("cannot find snapshot index for restore: %w", err) } // Find the maximum WAL index that occurs before timestamp. - maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Index, opt.Timestamp) + maxWALIndex, err := WALIndexAt(ctx, r, generation, opt.Index, opt.Timestamp) if err != nil { return fmt.Errorf("cannot find max wal index for restore: %w", err) } diff --git a/internal/internal.go b/internal/internal.go new file mode 100644 index 0000000..45b7d8c --- /dev/null +++ b/internal/internal.go @@ -0,0 +1,32 @@ +package internal + +import ( + "io" +) + +// ReadCloser wraps a reader to also attach a separate closer. +type ReadCloser struct { + r io.Reader + c io.Closer +} + +// NewReadCloser returns a new instance of ReadCloser. +func NewReadCloser(r io.Reader, c io.Closer) *ReadCloser { + return &ReadCloser{r, c} +} + +// Read reads bytes into the underlying reader. +func (r *ReadCloser) Read(p []byte) (n int, err error) { + return r.r.Read(p) +} + +// Close closes the reader (if implementing io.ReadCloser) and the Closer. +func (r *ReadCloser) Close() error { + if rc, ok := r.r.(io.Closer); ok { + if err := rc.Close(); err != nil { + r.c.Close() + return err + } + } + return r.c.Close() +} diff --git a/litestream.go b/litestream.go index 7e4ef25..4b76696 100644 --- a/litestream.go +++ b/litestream.go @@ -1,7 +1,6 @@ package litestream import ( - "compress/gzip" "database/sql" "encoding/binary" "errors" @@ -264,24 +263,6 @@ func isHexChar(ch rune) bool { return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') } -// gzipReadCloser wraps gzip.Reader to also close the underlying reader on close. -type gzipReadCloser struct { - r *gzip.Reader - closer io.ReadCloser -} - -func (r *gzipReadCloser) Read(p []byte) (n int, err error) { - return r.r.Read(p) -} - -func (r *gzipReadCloser) Close() error { - if err := r.r.Close(); err != nil { - r.closer.Close() - return err - } - return r.closer.Close() -} - // createFile creates the file and attempts to set the UID/GID. func createFile(filename string, uid, gid int) (*os.File, error) { f, err := os.Create(filename) diff --git a/replica.go b/replica.go index b85f1b5..6448890 100644 --- a/replica.go +++ b/replica.go @@ -13,6 +13,8 @@ import ( "sort" "sync" "time" + + "github.com/benbjohnson/litestream/internal" ) // Replica represents a remote destination to replicate the database & WAL. @@ -48,14 +50,6 @@ type Replica interface { // Returns a list of available WAL files in the replica. WALs(ctx context.Context) ([]*WALInfo, error) - // Returns the highest index for a snapshot within a generation that occurs - // before timestamp. If timestamp is zero, returns the latest snapshot. - SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) - - // Returns the highest index for a WAL file that occurs before timestamp. - // If timestamp is zero, returns the highest WAL index. - WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) - // Returns a reader for snapshot data at the given generation/index. SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) @@ -633,74 +627,6 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error { return nil } -// SnapsotIndexAt returns the highest index for a snapshot within a generation -// that occurs before timestamp. If timestamp is zero, returns the latest snapshot. -func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { - fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) - if os.IsNotExist(err) { - return 0, ErrNoSnapshots - } else if err != nil { - return 0, err - } - - index := -1 - var max time.Time - for _, fi := range fis { - // Read index from snapshot filename. - idx, _, err := ParseSnapshotPath(fi.Name()) - if err != nil { - continue // not a snapshot, skip - } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { - continue // after timestamp, skip - } - - // Use snapshot if it newer. - if max.IsZero() || fi.ModTime().After(max) { - index, max = idx, fi.ModTime() - } - } - - if index == -1 { - return 0, ErrNoSnapshots - } - return index, nil -} - -// Returns the highest index for a WAL file that occurs before maxIndex & timestamp. -// If timestamp is zero, returns the highest WAL index. -func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) { - var index int - fis, err := ioutil.ReadDir(r.WALDir(generation)) - if os.IsNotExist(err) { - return 0, nil - } else if err != nil { - return 0, err - } - - for _, fi := range fis { - // Read index from snapshot filename. - idx, _, _, _, err := ParseWALPath(fi.Name()) - if err != nil { - continue // not a snapshot, skip - } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { - continue // after timestamp, skip - } else if idx > maxIndex { - continue // after timestamp, skip - } else if idx < index { - continue // earlier index, skip - } - - index = idx - } - - // If max index is specified but not found, return an error. - if maxIndex != math.MaxInt64 && index != maxIndex { - return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index) - } - - return index, nil -} - // SnapshotReader returns a reader for snapshot data at the given generation/index. // Returns os.ErrNotExist if no matching index is found. func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { @@ -733,7 +659,7 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind f.Close() return nil, err } - return &gzipReadCloser{r: r, closer: f}, nil + return internal.NewReadCloser(r, f), nil } return nil, os.ErrNotExist } @@ -764,7 +690,7 @@ func (r *FileReplica) WALReader(ctx context.Context, generation string, index in f.Close() return nil, err } - return &gzipReadCloser{r: rd, closer: f}, nil + return internal.NewReadCloser(rd, f), nil } // EnforceRetention forces a new snapshot once the retention interval has passed. @@ -879,6 +805,63 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation return nil } +// SnapsotIndexAt returns the highest index for a snapshot within a generation +// that occurs before timestamp. If timestamp is zero, returns the latest snapshot. +func SnapshotIndexAt(ctx context.Context, r Replica, generation string, timestamp time.Time) (int, error) { + snapshots, err := r.Snapshots(ctx) + if err != nil { + return 0, err + } else if len(snapshots) == 0 { + return 0, ErrNoSnapshots + } + + index := -1 + var max time.Time + for _, snapshot := range snapshots { + if !timestamp.IsZero() && snapshot.CreatedAt.After(timestamp) { + continue // after timestamp, skip + } + + // Use snapshot if it newer. + if max.IsZero() || snapshot.CreatedAt.After(max) { + index, max = snapshot.Index, snapshot.CreatedAt + } + } + + if index == -1 { + return 0, ErrNoSnapshots + } + return index, nil +} + +// WALIndexAt returns the highest index for a WAL file that occurs before maxIndex & timestamp. +// If timestamp is zero, returns the highest WAL index. +func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int, timestamp time.Time) (int, error) { + wals, err := r.WALs(ctx) + if err != nil { + return 0, err + } + + var index int + for _, wal := range wals { + if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) { + continue // after timestamp, skip + } else if wal.Index > maxIndex { + continue // after max index, skip + } else if wal.Index < index { + continue // earlier index, skip + } + + index = wal.Index + } + + // If max index is specified but not found, return an error. + if maxIndex != math.MaxInt64 && index != maxIndex { + return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index) + } + return index, nil +} + // compressFile compresses a file and replaces it with a new file with a .gz extension. func compressFile(src, dst string, uid, gid int) error { r, err := os.Open(src) diff --git a/s3/s3.go b/s3/s3.go index d9e8abb..f9aee20 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -1,4 +1,4 @@ -package aws +package s3 import ( "compress/gzip" @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "log" - "math" "os" "path" "sync" @@ -18,6 +17,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/internal" ) const ( @@ -196,7 +196,6 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats } func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, err error) { - var generations []string if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.SnapshotDir(generation)), @@ -224,7 +223,6 @@ func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, e } func (r *Replica) walStats(generation string) (n int, min, max time.Time, err error) { - var generations []string if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.WALDir(generation)), @@ -623,74 +621,6 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return nil } -// SnapsotIndexAt returns the highest index for a snapshot within a generation -// that occurs before timestamp. If timestamp is zero, returns the latest snapshot. -func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { - fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) - if os.IsNotExist(err) { - return 0, litestream.ErrNoSnapshots - } else if err != nil { - return 0, err - } - - index := -1 - var max time.Time - for _, fi := range fis { - // Read index from snapshot filename. - idx, _, err := litestream.ParseSnapshotPath(fi.Name()) - if err != nil { - continue // not a snapshot, skip - } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { - continue // after timestamp, skip - } - - // Use snapshot if it newer. - if max.IsZero() || fi.ModTime().After(max) { - index, max = idx, fi.ModTime() - } - } - - if index == -1 { - return 0, litestream.ErrNoSnapshots - } - return index, nil -} - -// Returns the highest index for a WAL file that occurs before maxIndex & timestamp. -// If timestamp is zero, returns the highest WAL index. -func (r *Replica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) { - var index int - fis, err := ioutil.ReadDir(r.WALDir(generation)) - if os.IsNotExist(err) { - return 0, nil - } else if err != nil { - return 0, err - } - - for _, fi := range fis { - // Read index from snapshot filename. - idx, _, _, _, err := litestream.ParseWALPath(fi.Name()) - if err != nil { - continue // not a snapshot, skip - } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { - continue // after timestamp, skip - } else if idx > maxIndex { - continue // after timestamp, skip - } else if idx < index { - continue // earlier index, skip - } - - index = idx - } - - // If max index is specified but not found, return an error. - if maxIndex != math.MaxInt64 && index != maxIndex { - return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index) - } - - return index, nil -} - // SnapshotReader returns a reader for snapshot data at the given generation/index. // Returns os.ErrNotExist if no matching index is found. func (r *Replica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { @@ -723,7 +653,7 @@ func (r *Replica) SnapshotReader(ctx context.Context, generation string, index i f.Close() return nil, err } - return &gzipReadCloser{r: r, closer: f}, nil + return internal.NewReadCloser(r, f), nil } return nil, os.ErrNotExist } @@ -754,7 +684,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( f.Close() return nil, err } - return &gzipReadCloser{r: rd, closer: f}, nil + return internal.NewReadCloser(rd, f), nil } // EnforceRetention forces a new snapshot once the retention interval has passed.