Add LITESTREAM_CONFIG env var

This commit is contained in:
Ben Johnson
2021-01-13 13:17:38 -07:00
parent 57a02a8628
commit a42f83f3cb
13 changed files with 117 additions and 185 deletions

View File

@@ -71,6 +71,6 @@ Arguments:
Defaults to %s Defaults to %s
`[1:], `[1:],
DefaultConfigPath, DefaultConfigPath(),
) )
} }

View File

@@ -113,7 +113,7 @@ Arguments:
Optional, filters by replica. Optional, filters by replica.
`[1:], `[1:],
DefaultConfigPath, DefaultConfigPath(),
) )
} }

View File

@@ -22,9 +22,6 @@ var (
Version = "(development build)" Version = "(development build)"
) )
// DefaultConfigPath is the default configuration path.
const DefaultConfigPath = "/etc/litestream.yml"
func main() { func main() {
log.SetFlags(0) log.SetFlags(0)
@@ -165,11 +162,20 @@ type ReplicaConfig struct {
// S3 settings // S3 settings
AccessKeyID string `yaml:"access-key-id"` AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"` SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"` Bucket string `yaml:"bucket"`
} }
// DefaultConfigPath returns the default config path.
func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
return v
}
return "/etc/litestream.yml"
}
func registerConfigFlag(fs *flag.FlagSet, p *string) { func registerConfigFlag(fs *flag.FlagSet, p *string) {
fs.StringVar(p, "config", DefaultConfigPath, "config path") fs.StringVar(p, "config", DefaultConfigPath(), "config path")
} }
// newDBFromConfig instantiates a DB based on a configuration. // newDBFromConfig instantiates a DB based on a configuration.
@@ -226,7 +232,7 @@ func newS3ReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*s3.Repli
return nil, fmt.Errorf("%s: s3 bucket required", db.Path()) return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
} }
r := aws.NewS3Replica(db, config.Name) r := s3.NewReplica(db, config.Name)
r.AccessKeyID = config.AccessKeyID r.AccessKeyID = config.AccessKeyID
r.SecretAccessKey = config.SecretAccessKey r.SecretAccessKey = config.SecretAccessKey
r.Region = config.Region r.Region = config.Region

View File

@@ -130,5 +130,5 @@ Arguments:
-v -v
Enable verbose logging output. Enable verbose logging output.
`[1:], DefaultConfigPath) `[1:], DefaultConfigPath())
} }

View File

@@ -142,6 +142,6 @@ Examples:
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db $ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
`[1:], `[1:],
DefaultConfigPath, DefaultConfigPath(),
) )
} }

View File

@@ -112,6 +112,6 @@ Examples:
$ litestream snapshots -replica s3 /path/to/db $ litestream snapshots -replica s3 /path/to/db
`[1:], `[1:],
DefaultConfigPath, DefaultConfigPath(),
) )
} }

View File

@@ -131,6 +131,6 @@ Examples:
$ litestream restore -replica s3 /path/to/db $ litestream restore -replica s3 /path/to/db
`[1:], `[1:],
DefaultConfigPath, DefaultConfigPath(),
) )
} }

View File

@@ -121,6 +121,6 @@ Examples:
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db $ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db
`[1:], `[1:],
DefaultConfigPath, DefaultConfigPath(),
) )
} }

4
db.go
View File

@@ -1298,13 +1298,13 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
} }
// Find lastest snapshot that occurs before timestamp. // Find lastest snapshot that occurs before timestamp.
minWALIndex, err := r.SnapshotIndexAt(ctx, generation, opt.Timestamp) minWALIndex, err := SnapshotIndexAt(ctx, r, generation, opt.Timestamp)
if err != nil { if err != nil {
return fmt.Errorf("cannot find snapshot index for restore: %w", err) return fmt.Errorf("cannot find snapshot index for restore: %w", err)
} }
// Find the maximum WAL index that occurs before timestamp. // Find the maximum WAL index that occurs before timestamp.
maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Index, opt.Timestamp) maxWALIndex, err := WALIndexAt(ctx, r, generation, opt.Index, opt.Timestamp)
if err != nil { if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err) return fmt.Errorf("cannot find max wal index for restore: %w", err)
} }

32
internal/internal.go Normal file
View File

@@ -0,0 +1,32 @@
package internal
import (
"io"
)
// ReadCloser wraps a reader to also attach a separate closer.
type ReadCloser struct {
r io.Reader
c io.Closer
}
// NewReadCloser returns a new instance of ReadCloser.
func NewReadCloser(r io.Reader, c io.Closer) *ReadCloser {
return &ReadCloser{r, c}
}
// Read reads bytes into the underlying reader.
func (r *ReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
// Close closes the reader (if implementing io.ReadCloser) and the Closer.
func (r *ReadCloser) Close() error {
if rc, ok := r.r.(io.Closer); ok {
if err := rc.Close(); err != nil {
r.c.Close()
return err
}
}
return r.c.Close()
}

View File

@@ -1,7 +1,6 @@
package litestream package litestream
import ( import (
"compress/gzip"
"database/sql" "database/sql"
"encoding/binary" "encoding/binary"
"errors" "errors"
@@ -264,24 +263,6 @@ func isHexChar(ch rune) bool {
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
} }
// gzipReadCloser wraps gzip.Reader to also close the underlying reader on close.
type gzipReadCloser struct {
r *gzip.Reader
closer io.ReadCloser
}
func (r *gzipReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
func (r *gzipReadCloser) Close() error {
if err := r.r.Close(); err != nil {
r.closer.Close()
return err
}
return r.closer.Close()
}
// createFile creates the file and attempts to set the UID/GID. // createFile creates the file and attempts to set the UID/GID.
func createFile(filename string, uid, gid int) (*os.File, error) { func createFile(filename string, uid, gid int) (*os.File, error) {
f, err := os.Create(filename) f, err := os.Create(filename)

View File

@@ -13,6 +13,8 @@ import (
"sort" "sort"
"sync" "sync"
"time" "time"
"github.com/benbjohnson/litestream/internal"
) )
// Replica represents a remote destination to replicate the database & WAL. // Replica represents a remote destination to replicate the database & WAL.
@@ -48,14 +50,6 @@ type Replica interface {
// Returns a list of available WAL files in the replica. // Returns a list of available WAL files in the replica.
WALs(ctx context.Context) ([]*WALInfo, error) WALs(ctx context.Context) ([]*WALInfo, error)
// Returns the highest index for a snapshot within a generation that occurs
// before timestamp. If timestamp is zero, returns the latest snapshot.
SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
// Returns the highest index for a WAL file that occurs before timestamp.
// If timestamp is zero, returns the highest WAL index.
WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error)
// Returns a reader for snapshot data at the given generation/index. // Returns a reader for snapshot data at the given generation/index.
SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
@@ -633,74 +627,6 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error {
return nil return nil
} }
// SnapsotIndexAt returns the highest index for a snapshot within a generation
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) {
fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if os.IsNotExist(err) {
return 0, ErrNoSnapshots
} else if err != nil {
return 0, err
}
index := -1
var max time.Time
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, err := ParseSnapshotPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
}
// Use snapshot if it newer.
if max.IsZero() || fi.ModTime().After(max) {
index, max = idx, fi.ModTime()
}
}
if index == -1 {
return 0, ErrNoSnapshots
}
return index, nil
}
// Returns the highest index for a WAL file that occurs before maxIndex & timestamp.
// If timestamp is zero, returns the highest WAL index.
func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) {
var index int
fis, err := ioutil.ReadDir(r.WALDir(generation))
if os.IsNotExist(err) {
return 0, nil
} else if err != nil {
return 0, err
}
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, _, _, err := ParseWALPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
} else if idx > maxIndex {
continue // after timestamp, skip
} else if idx < index {
continue // earlier index, skip
}
index = idx
}
// If max index is specified but not found, return an error.
if maxIndex != math.MaxInt64 && index != maxIndex {
return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index)
}
return index, nil
}
// SnapshotReader returns a reader for snapshot data at the given generation/index. // SnapshotReader returns a reader for snapshot data at the given generation/index.
// Returns os.ErrNotExist if no matching index is found. // Returns os.ErrNotExist if no matching index is found.
func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
@@ -733,7 +659,7 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind
f.Close() f.Close()
return nil, err return nil, err
} }
return &gzipReadCloser{r: r, closer: f}, nil return internal.NewReadCloser(r, f), nil
} }
return nil, os.ErrNotExist return nil, os.ErrNotExist
} }
@@ -764,7 +690,7 @@ func (r *FileReplica) WALReader(ctx context.Context, generation string, index in
f.Close() f.Close()
return nil, err return nil, err
} }
return &gzipReadCloser{r: rd, closer: f}, nil return internal.NewReadCloser(rd, f), nil
} }
// EnforceRetention forces a new snapshot once the retention interval has passed. // EnforceRetention forces a new snapshot once the retention interval has passed.
@@ -879,6 +805,63 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
return nil return nil
} }
// SnapsotIndexAt returns the highest index for a snapshot within a generation
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func SnapshotIndexAt(ctx context.Context, r Replica, generation string, timestamp time.Time) (int, error) {
snapshots, err := r.Snapshots(ctx)
if err != nil {
return 0, err
} else if len(snapshots) == 0 {
return 0, ErrNoSnapshots
}
index := -1
var max time.Time
for _, snapshot := range snapshots {
if !timestamp.IsZero() && snapshot.CreatedAt.After(timestamp) {
continue // after timestamp, skip
}
// Use snapshot if it newer.
if max.IsZero() || snapshot.CreatedAt.After(max) {
index, max = snapshot.Index, snapshot.CreatedAt
}
}
if index == -1 {
return 0, ErrNoSnapshots
}
return index, nil
}
// WALIndexAt returns the highest index for a WAL file that occurs before maxIndex & timestamp.
// If timestamp is zero, returns the highest WAL index.
func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int, timestamp time.Time) (int, error) {
wals, err := r.WALs(ctx)
if err != nil {
return 0, err
}
var index int
for _, wal := range wals {
if !timestamp.IsZero() && wal.CreatedAt.After(timestamp) {
continue // after timestamp, skip
} else if wal.Index > maxIndex {
continue // after max index, skip
} else if wal.Index < index {
continue // earlier index, skip
}
index = wal.Index
}
// If max index is specified but not found, return an error.
if maxIndex != math.MaxInt64 && index != maxIndex {
return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index)
}
return index, nil
}
// compressFile compresses a file and replaces it with a new file with a .gz extension. // compressFile compresses a file and replaces it with a new file with a .gz extension.
func compressFile(src, dst string, uid, gid int) error { func compressFile(src, dst string, uid, gid int) error {
r, err := os.Open(src) r, err := os.Open(src)

View File

@@ -1,4 +1,4 @@
package aws package s3
import ( import (
"compress/gzip" "compress/gzip"
@@ -7,7 +7,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"math"
"os" "os"
"path" "path"
"sync" "sync"
@@ -18,6 +17,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
) )
const ( const (
@@ -196,7 +196,6 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats
} }
func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, err error) { func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, err error) {
var generations []string
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation)), Prefix: aws.String(r.SnapshotDir(generation)),
@@ -224,7 +223,6 @@ func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, e
} }
func (r *Replica) walStats(generation string) (n int, min, max time.Time, err error) { func (r *Replica) walStats(generation string) (n int, min, max time.Time, err error) {
var generations []string
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation)), Prefix: aws.String(r.WALDir(generation)),
@@ -623,74 +621,6 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return nil return nil
} }
// SnapsotIndexAt returns the highest index for a snapshot within a generation
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) {
fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if os.IsNotExist(err) {
return 0, litestream.ErrNoSnapshots
} else if err != nil {
return 0, err
}
index := -1
var max time.Time
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, err := litestream.ParseSnapshotPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
}
// Use snapshot if it newer.
if max.IsZero() || fi.ModTime().After(max) {
index, max = idx, fi.ModTime()
}
}
if index == -1 {
return 0, litestream.ErrNoSnapshots
}
return index, nil
}
// Returns the highest index for a WAL file that occurs before maxIndex & timestamp.
// If timestamp is zero, returns the highest WAL index.
func (r *Replica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) {
var index int
fis, err := ioutil.ReadDir(r.WALDir(generation))
if os.IsNotExist(err) {
return 0, nil
} else if err != nil {
return 0, err
}
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, _, _, err := litestream.ParseWALPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
} else if idx > maxIndex {
continue // after timestamp, skip
} else if idx < index {
continue // earlier index, skip
}
index = idx
}
// If max index is specified but not found, return an error.
if maxIndex != math.MaxInt64 && index != maxIndex {
return index, fmt.Errorf("unable to locate index %d in generation %q, highest index was %d", maxIndex, generation, index)
}
return index, nil
}
// SnapshotReader returns a reader for snapshot data at the given generation/index. // SnapshotReader returns a reader for snapshot data at the given generation/index.
// Returns os.ErrNotExist if no matching index is found. // Returns os.ErrNotExist if no matching index is found.
func (r *Replica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { func (r *Replica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
@@ -723,7 +653,7 @@ func (r *Replica) SnapshotReader(ctx context.Context, generation string, index i
f.Close() f.Close()
return nil, err return nil, err
} }
return &gzipReadCloser{r: r, closer: f}, nil return internal.NewReadCloser(r, f), nil
} }
return nil, os.ErrNotExist return nil, os.ErrNotExist
} }
@@ -754,7 +684,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
f.Close() f.Close()
return nil, err return nil, err
} }
return &gzipReadCloser{r: rd, closer: f}, nil return internal.NewReadCloser(rd, f), nil
} }
// EnforceRetention forces a new snapshot once the retention interval has passed. // EnforceRetention forces a new snapshot once the retention interval has passed.