This commit changes `litestream replicate` to accept a database path and a replica URL instead of using the config file. This allows people to quickly try out the tool instead of learning the config file syntax.
1041 lines
28 KiB
Go
1041 lines
28 KiB
Go
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
|
"github.com/aws/aws-sdk-go/aws/defaults"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
|
"github.com/benbjohnson/litestream"
|
|
"github.com/benbjohnson/litestream/internal"
|
|
"github.com/pierrec/lz4/v4"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
)
|
|
|
|
// S3 replica default settings.
|
|
const (
|
|
DefaultSyncInterval = 10 * time.Second
|
|
|
|
DefaultRetention = 24 * time.Hour
|
|
|
|
DefaultRetentionCheckInterval = 1 * time.Hour
|
|
)
|
|
|
|
// MaxKeys is the number of keys S3 can operate on per batch.
|
|
const MaxKeys = 1000
|
|
|
|
var _ litestream.Replica = (*Replica)(nil)
|
|
|
|
// Replica is a replica that replicates a DB to an S3 bucket.
|
|
type Replica struct {
|
|
db *litestream.DB // source database
|
|
name string // replica name, optional
|
|
s3 *s3.S3 // s3 service
|
|
uploader *s3manager.Uploader
|
|
|
|
mu sync.RWMutex
|
|
snapshotMu sync.Mutex
|
|
pos litestream.Pos // last position
|
|
|
|
wg sync.WaitGroup
|
|
cancel func()
|
|
|
|
snapshotTotalGauge prometheus.Gauge
|
|
walBytesCounter prometheus.Counter
|
|
walIndexGauge prometheus.Gauge
|
|
walOffsetGauge prometheus.Gauge
|
|
putOperationTotalCounter prometheus.Counter
|
|
putOperationBytesCounter prometheus.Counter
|
|
getOperationTotalCounter prometheus.Counter
|
|
getOperationBytesCounter prometheus.Counter
|
|
listOperationTotalCounter prometheus.Counter
|
|
deleteOperationTotalCounter prometheus.Counter
|
|
|
|
// AWS authentication keys.
|
|
AccessKeyID string
|
|
SecretAccessKey string
|
|
|
|
// S3 bucket information
|
|
Region string
|
|
Bucket string
|
|
Path string
|
|
|
|
// Time between syncs with the shadow WAL.
|
|
SyncInterval time.Duration
|
|
|
|
// Time to keep snapshots and related WAL files.
|
|
// Database is snapshotted after interval and older WAL files are discarded.
|
|
Retention time.Duration
|
|
|
|
// Time between retention checks.
|
|
RetentionCheckInterval time.Duration
|
|
|
|
// Time between validation checks.
|
|
ValidationInterval time.Duration
|
|
|
|
// If true, replica monitors database for changes automatically.
|
|
// Set to false if replica is being used synchronously (such as in tests).
|
|
MonitorEnabled bool
|
|
}
|
|
|
|
// NewReplica returns a new instance of Replica.
|
|
func NewReplica(db *litestream.DB, name string) *Replica {
|
|
r := &Replica{
|
|
db: db,
|
|
name: name,
|
|
cancel: func() {},
|
|
|
|
SyncInterval: DefaultSyncInterval,
|
|
Retention: DefaultRetention,
|
|
RetentionCheckInterval: DefaultRetentionCheckInterval,
|
|
|
|
MonitorEnabled: true,
|
|
}
|
|
|
|
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name())
|
|
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name())
|
|
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name())
|
|
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name())
|
|
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
|
|
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
|
|
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
|
|
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
|
|
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST")
|
|
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE")
|
|
|
|
return r
|
|
}
|
|
|
|
// Name returns the name of the replica. Returns the type if no name set.
|
|
func (r *Replica) Name() string {
|
|
if r.name != "" {
|
|
return r.name
|
|
}
|
|
return r.Type()
|
|
}
|
|
|
|
// Type returns the type of replica.
|
|
func (r *Replica) Type() string {
|
|
return "s3"
|
|
}
|
|
|
|
// DB returns the parent database reference.
|
|
func (r *Replica) DB() *litestream.DB {
|
|
return r.db
|
|
}
|
|
|
|
// LastPos returns the last successfully replicated position.
|
|
func (r *Replica) LastPos() litestream.Pos {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
return r.pos
|
|
}
|
|
|
|
// GenerationDir returns the path to a generation's root directory.
|
|
func (r *Replica) GenerationDir(generation string) string {
|
|
return path.Join(r.Path, "generations", generation)
|
|
}
|
|
|
|
// SnapshotDir returns the path to a generation's snapshot directory.
|
|
func (r *Replica) SnapshotDir(generation string) string {
|
|
return path.Join(r.GenerationDir(generation), "snapshots")
|
|
}
|
|
|
|
// SnapshotPath returns the path to a snapshot file.
|
|
func (r *Replica) SnapshotPath(generation string, index int) string {
|
|
return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.lz4", index))
|
|
}
|
|
|
|
// MaxSnapshotIndex returns the highest index for the snapshots.
|
|
func (r *Replica) MaxSnapshotIndex(generation string) (int, error) {
|
|
snapshots, err := r.Snapshots(context.Background())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
index := -1
|
|
for _, snapshot := range snapshots {
|
|
if snapshot.Generation != generation {
|
|
continue
|
|
} else if index == -1 || snapshot.Index > index {
|
|
index = snapshot.Index
|
|
}
|
|
}
|
|
if index == -1 {
|
|
return 0, fmt.Errorf("no snapshots found")
|
|
}
|
|
return index, nil
|
|
}
|
|
|
|
// WALDir returns the path to a generation's WAL directory
|
|
func (r *Replica) WALDir(generation string) string {
|
|
return path.Join(r.GenerationDir(generation), "wal")
|
|
}
|
|
|
|
// Generations returns a list of available generation names.
|
|
func (r *Replica) Generations(ctx context.Context) ([]string, error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var generations []string
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(path.Join(r.Path, "generations") + "/"),
|
|
Delimiter: aws.String("/"),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, prefix := range page.CommonPrefixes {
|
|
name := path.Base(*prefix.Prefix)
|
|
if !litestream.IsGenerationName(name) {
|
|
continue
|
|
}
|
|
generations = append(generations, name)
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return generations, nil
|
|
}
|
|
|
|
// GenerationStats returns stats for a generation.
|
|
func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats litestream.GenerationStats, err error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return stats, err
|
|
}
|
|
|
|
// Determine stats for all snapshots.
|
|
n, min, max, err := r.snapshotStats(ctx, generation)
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
stats.SnapshotN = n
|
|
stats.CreatedAt, stats.UpdatedAt = min, max
|
|
|
|
// Update stats if we have WAL files.
|
|
n, min, max, err = r.walStats(ctx, generation)
|
|
if err != nil {
|
|
return stats, err
|
|
} else if n == 0 {
|
|
return stats, nil
|
|
}
|
|
|
|
stats.WALN = n
|
|
if stats.CreatedAt.IsZero() || min.Before(stats.CreatedAt) {
|
|
stats.CreatedAt = min
|
|
}
|
|
if stats.UpdatedAt.IsZero() || max.After(stats.UpdatedAt) {
|
|
stats.UpdatedAt = max
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, obj := range page.Contents {
|
|
if !litestream.IsSnapshotPath(path.Base(*obj.Key)) {
|
|
continue
|
|
}
|
|
modTime := obj.LastModified.UTC()
|
|
|
|
n++
|
|
if min.IsZero() || modTime.Before(min) {
|
|
min = modTime
|
|
}
|
|
if max.IsZero() || modTime.After(max) {
|
|
max = modTime
|
|
}
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return n, min, max, err
|
|
}
|
|
return n, min, max, nil
|
|
}
|
|
|
|
func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(r.WALDir(generation) + "/"),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, obj := range page.Contents {
|
|
if !litestream.IsWALPath(path.Base(*obj.Key)) {
|
|
continue
|
|
}
|
|
modTime := obj.LastModified.UTC()
|
|
|
|
n++
|
|
if min.IsZero() || modTime.Before(min) {
|
|
min = modTime
|
|
}
|
|
if max.IsZero() || modTime.After(max) {
|
|
max = modTime
|
|
}
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return n, min, max, err
|
|
}
|
|
return n, min, max, nil
|
|
}
|
|
|
|
// Snapshots returns a list of available snapshots in the replica.
|
|
func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
generations, err := r.Generations(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var infos []*litestream.SnapshotInfo
|
|
for _, generation := range generations {
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
|
|
Delimiter: aws.String("/"),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, obj := range page.Contents {
|
|
key := path.Base(*obj.Key)
|
|
index, _, err := litestream.ParseSnapshotPath(key)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
infos = append(infos, &litestream.SnapshotInfo{
|
|
Name: key,
|
|
Replica: r.Name(),
|
|
Generation: generation,
|
|
Index: index,
|
|
Size: *obj.Size,
|
|
CreatedAt: obj.LastModified.UTC(),
|
|
})
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return infos, nil
|
|
}
|
|
|
|
// WALs returns a list of available WAL files in the replica.
|
|
func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
generations, err := r.Generations(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var infos []*litestream.WALInfo
|
|
for _, generation := range generations {
|
|
var prev *litestream.WALInfo
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(r.WALDir(generation) + "/"),
|
|
Delimiter: aws.String("/"),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, obj := range page.Contents {
|
|
key := path.Base(*obj.Key)
|
|
|
|
index, offset, _, err := litestream.ParseWALPath(key)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Update previous record if generation & index match.
|
|
if prev != nil && prev.Index == index {
|
|
prev.Size += *obj.Size
|
|
prev.CreatedAt = obj.LastModified.UTC()
|
|
continue
|
|
}
|
|
|
|
// Append new WAL record and keep reference to append additional
|
|
// size for segmented WAL files.
|
|
prev = &litestream.WALInfo{
|
|
Name: key,
|
|
Replica: r.Name(),
|
|
Generation: generation,
|
|
Index: index,
|
|
Offset: offset,
|
|
Size: *obj.Size,
|
|
CreatedAt: obj.LastModified.UTC(),
|
|
}
|
|
infos = append(infos, prev)
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return infos, nil
|
|
}
|
|
|
|
// Start starts replication for a given generation.
|
|
func (r *Replica) Start(ctx context.Context) {
|
|
// Ignore if replica is being used sychronously.
|
|
if !r.MonitorEnabled {
|
|
return
|
|
}
|
|
|
|
// Stop previous replication.
|
|
r.Stop()
|
|
|
|
// Wrap context with cancelation.
|
|
ctx, r.cancel = context.WithCancel(ctx)
|
|
|
|
// Start goroutines to manage replica data.
|
|
r.wg.Add(3)
|
|
go func() { defer r.wg.Done(); r.monitor(ctx) }()
|
|
go func() { defer r.wg.Done(); r.retainer(ctx) }()
|
|
go func() { defer r.wg.Done(); r.validator(ctx) }()
|
|
}
|
|
|
|
// Stop cancels any outstanding replication and blocks until finished.
|
|
func (r *Replica) Stop() {
|
|
r.cancel()
|
|
r.wg.Wait()
|
|
}
|
|
|
|
// monitor runs in a separate goroutine and continuously replicates the DB.
|
|
func (r *Replica) monitor(ctx context.Context) {
|
|
ticker := time.NewTicker(r.SyncInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Continuously check for new data to replicate.
|
|
ch := make(chan struct{})
|
|
close(ch)
|
|
var notify <-chan struct{} = ch
|
|
|
|
for initial := true; ; initial = false {
|
|
// Enforce a minimum time between synchronization.
|
|
if !initial {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
|
|
// Wait for changes to the database.
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-notify:
|
|
}
|
|
|
|
// Fetch new notify channel before replicating data.
|
|
notify = r.db.Notify()
|
|
|
|
// Synchronize the shadow wal into the replication directory.
|
|
if err := r.Sync(ctx); err != nil {
|
|
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// retainer runs in a separate goroutine and handles retention.
|
|
func (r *Replica) retainer(ctx context.Context) {
|
|
ticker := time.NewTicker(r.RetentionCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := r.EnforceRetention(ctx); err != nil {
|
|
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// validator runs in a separate goroutine and handles periodic validation.
|
|
func (r *Replica) validator(ctx context.Context) {
|
|
// Initialize counters since validation occurs infrequently.
|
|
for _, status := range []string{"ok", "error"} {
|
|
internal.ReplicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), status).Add(0)
|
|
}
|
|
|
|
if r.ValidationInterval <= 0 {
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(r.ValidationInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := litestream.ValidateReplica(ctx, r); err != nil {
|
|
log.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// CalcPos returns the position for the replica for the current generation.
|
|
// Returns a zero value if there is no active generation.
|
|
func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestream.Pos, err error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return pos, err
|
|
}
|
|
|
|
pos.Generation = generation
|
|
|
|
// Find maximum snapshot index.
|
|
if pos.Index, err = r.MaxSnapshotIndex(generation); err != nil {
|
|
return litestream.Pos{}, err
|
|
}
|
|
|
|
index := -1
|
|
var offset int64
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(r.WALDir(generation) + "/"),
|
|
Delimiter: aws.String("/"),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, obj := range page.Contents {
|
|
key := path.Base(*obj.Key)
|
|
|
|
idx, off, _, err := litestream.ParseWALPath(key)
|
|
if err != nil {
|
|
continue // invalid wal filename
|
|
}
|
|
|
|
if index == -1 || idx > index {
|
|
index, offset = idx, 0 // start tracking new wal
|
|
} else if idx == index && off > offset {
|
|
offset = off // update offset
|
|
}
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return litestream.Pos{}, err
|
|
}
|
|
if index == -1 {
|
|
return pos, nil // no wal files
|
|
}
|
|
pos.Index = index
|
|
pos.Offset = offset
|
|
|
|
return pos, nil
|
|
}
|
|
|
|
// snapshot copies the entire database to the replica path.
|
|
func (r *Replica) snapshot(ctx context.Context, generation string, index int) error {
|
|
// Acquire a read lock on the database during snapshot to prevent checkpoints.
|
|
tx, err := r.db.SQLDB().Begin()
|
|
if err != nil {
|
|
return err
|
|
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
|
|
// Open database file handle.
|
|
f, err := os.Open(r.db.Path())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pr, pw := io.Pipe()
|
|
zw := lz4.NewWriter(pw)
|
|
go func() {
|
|
if _, err := io.Copy(zw, f); err != nil {
|
|
_ = pw.CloseWithError(err)
|
|
return
|
|
}
|
|
_ = pw.CloseWithError(zw.Close())
|
|
}()
|
|
|
|
snapshotPath := r.SnapshotPath(generation, index)
|
|
|
|
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Key: aws.String(snapshotPath),
|
|
Body: pr,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.putOperationTotalCounter.Inc()
|
|
r.putOperationBytesCounter.Add(float64(fi.Size()))
|
|
|
|
return nil
|
|
}
|
|
|
|
// snapshotN returns the number of snapshots for a generation.
|
|
func (r *Replica) snapshotN(generation string) (int, error) {
|
|
snapshots, err := r.Snapshots(context.Background())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var n int
|
|
for _, snapshot := range snapshots {
|
|
if snapshot.Generation == generation {
|
|
n++
|
|
}
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// Init initializes the connection to S3. No-op if already initialized.
|
|
func (r *Replica) Init(ctx context.Context) (err error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if r.s3 != nil {
|
|
return nil
|
|
}
|
|
|
|
// Look up region if not specified.
|
|
region := r.Region
|
|
if region == "" {
|
|
if region, err = r.findBucketRegion(ctx, r.Bucket); err != nil {
|
|
return fmt.Errorf("cannot lookup bucket region: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create new AWS session.
|
|
config := r.config()
|
|
config.Region = aws.String(region)
|
|
sess, err := session.NewSession(config)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create aws session: %w", err)
|
|
}
|
|
r.s3 = s3.New(sess)
|
|
r.uploader = s3manager.NewUploader(sess)
|
|
return nil
|
|
}
|
|
|
|
// config returns the AWS configuration. Uses the default credential chain
|
|
// unless a key/secret are explicitly set.
|
|
func (r *Replica) config() *aws.Config {
|
|
config := defaults.Get().Config
|
|
if r.AccessKeyID != "" || r.SecretAccessKey != "" {
|
|
config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "")
|
|
}
|
|
return config
|
|
}
|
|
|
|
func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, error) {
|
|
// Connect to US standard region to fetch info.
|
|
config := r.config()
|
|
config.Region = aws.String("us-east-1")
|
|
sess, err := session.NewSession(config)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Fetch bucket location, if possible. Must be bucket owner.
|
|
// This call can return a nil location which means it's in us-east-1.
|
|
if out, err := s3.New(sess).GetBucketLocation(&s3.GetBucketLocationInput{
|
|
Bucket: aws.String(bucket),
|
|
}); err != nil {
|
|
return "", err
|
|
} else if out.LocationConstraint != nil {
|
|
return *out.LocationConstraint, nil
|
|
}
|
|
return "us-east-1", nil
|
|
}
|
|
|
|
// Sync replays data from the shadow WAL and uploads it to S3.
|
|
func (r *Replica) Sync(ctx context.Context) (err error) {
|
|
// Clear last position if if an error occurs during sync.
|
|
defer func() {
|
|
if err != nil {
|
|
r.mu.Lock()
|
|
r.pos = litestream.Pos{}
|
|
r.mu.Unlock()
|
|
}
|
|
}()
|
|
|
|
// Connect to S3, if necessary.
|
|
if err := r.Init(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find current position of database.
|
|
dpos, err := r.db.Pos()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot determine current generation: %w", err)
|
|
} else if dpos.IsZero() {
|
|
return fmt.Errorf("no generation, waiting for data")
|
|
}
|
|
generation := dpos.Generation
|
|
|
|
// Calculate position if we don't have a previous position or if the generation changes.
|
|
// Ensure sync & retainer do not snapshot at the same time.
|
|
if lastPos := r.LastPos(); lastPos.IsZero() || lastPos.Generation != generation {
|
|
if err := func() error {
|
|
r.snapshotMu.Lock()
|
|
defer r.snapshotMu.Unlock()
|
|
|
|
// Create snapshot if no snapshots exist for generation.
|
|
if n, err := r.snapshotN(generation); err != nil {
|
|
return err
|
|
} else if n == 0 {
|
|
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
|
|
return err
|
|
}
|
|
r.snapshotTotalGauge.Set(1.0)
|
|
} else {
|
|
r.snapshotTotalGauge.Set(float64(n))
|
|
}
|
|
|
|
// Determine position, if necessary.
|
|
pos, err := r.CalcPos(ctx, generation)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot determine replica position: %s", err)
|
|
}
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.pos = pos
|
|
|
|
return nil
|
|
}(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Read all WAL files since the last position.
|
|
for {
|
|
if err = r.syncWAL(ctx); err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
|
rd, err := r.db.ShadowWALReader(r.LastPos())
|
|
if err == io.EOF {
|
|
return err
|
|
} else if err != nil {
|
|
return fmt.Errorf("wal reader: %w", err)
|
|
}
|
|
defer rd.Close()
|
|
|
|
// Read to intermediate buffer to determine size.
|
|
pos := rd.Pos()
|
|
b, err := ioutil.ReadAll(rd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
zw := lz4.NewWriter(&buf)
|
|
n, err := zw.Write(b)
|
|
if err != nil {
|
|
return err
|
|
} else if err := zw.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Build a WAL path with the index/offset as well as size so we can ensure
|
|
// that files are contiguous without having to decompress.
|
|
walPath := path.Join(
|
|
r.WALDir(rd.Pos().Generation),
|
|
litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".lz4",
|
|
)
|
|
|
|
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Key: aws.String(walPath),
|
|
Body: &buf,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
r.putOperationTotalCounter.Inc()
|
|
r.putOperationBytesCounter.Add(float64(n)) // compressed bytes
|
|
|
|
// Save last replicated position.
|
|
r.mu.Lock()
|
|
r.pos = rd.Pos()
|
|
r.mu.Unlock()
|
|
|
|
// Track raw bytes processed & current position.
|
|
r.walBytesCounter.Add(float64(len(b))) // raw bytes
|
|
r.walIndexGauge.Set(float64(rd.Pos().Index))
|
|
r.walOffsetGauge.Set(float64(rd.Pos().Offset))
|
|
|
|
return nil
|
|
}
|
|
|
|
// SnapshotReader returns a reader for snapshot data at the given generation/index.
|
|
func (r *Replica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Pipe download to return an io.Reader.
|
|
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Key: aws.String(r.SnapshotPath(generation, index)),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.getOperationTotalCounter.Inc()
|
|
r.getOperationBytesCounter.Add(float64(*out.ContentLength))
|
|
|
|
// Decompress the snapshot file.
|
|
return internal.NewReadCloser(lz4.NewReader(out.Body), out.Body), nil
|
|
}
|
|
|
|
// WALReader returns a reader for WAL data at the given index.
|
|
// Returns os.ErrNotExist if no matching index is found.
|
|
func (r *Replica) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Collect all files for the index.
|
|
var keys []string
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, obj := range page.Contents {
|
|
_, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
keys = append(keys, *obj.Key)
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return nil, err
|
|
} else if len(keys) == 0 {
|
|
return nil, os.ErrNotExist
|
|
}
|
|
|
|
// Open each file and concatenate into a multi-reader.
|
|
var buf bytes.Buffer
|
|
var offset int64
|
|
for _, key := range keys {
|
|
// Ensure offset is correct as we copy segments into buffer.
|
|
_, off, _, _ := litestream.ParseWALPath(path.Base(key))
|
|
if off != offset {
|
|
return nil, fmt.Errorf("out of sequence wal segments: %s/%08x at remote offset %d, expected offset %d", generation, index, off, offset)
|
|
}
|
|
|
|
// Pipe download to return an io.Reader.
|
|
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer out.Body.Close()
|
|
|
|
r.getOperationTotalCounter.Inc()
|
|
r.getOperationBytesCounter.Add(float64(*out.ContentLength))
|
|
|
|
zr := lz4.NewReader(out.Body)
|
|
|
|
n, err := io.Copy(&buf, zr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
offset += int64(n)
|
|
}
|
|
|
|
return ioutil.NopCloser(&buf), nil
|
|
}
|
|
|
|
// EnforceRetention forces a new snapshot once the retention interval has passed.
|
|
// Older snapshots and WAL files are then removed.
|
|
func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
|
|
if err := r.Init(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure sync & retainer do not snapshot at the same time.
|
|
var snapshots []*litestream.SnapshotInfo
|
|
if err := func() error {
|
|
r.snapshotMu.Lock()
|
|
defer r.snapshotMu.Unlock()
|
|
|
|
// Find current position of database.
|
|
pos, err := r.db.Pos()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot determine current generation: %w", err)
|
|
} else if pos.IsZero() {
|
|
return fmt.Errorf("no generation, waiting for data")
|
|
}
|
|
|
|
// Obtain list of snapshots that are within the retention period.
|
|
if snapshots, err = r.Snapshots(ctx); err != nil {
|
|
return fmt.Errorf("cannot obtain snapshot list: %w", err)
|
|
}
|
|
snapshots = litestream.FilterSnapshotsAfter(snapshots, time.Now().Add(-r.Retention))
|
|
|
|
// If no retained snapshots exist, create a new snapshot.
|
|
if len(snapshots) == 0 {
|
|
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
|
|
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
|
|
return fmt.Errorf("cannot snapshot: %w", err)
|
|
}
|
|
snapshots = append(snapshots, &litestream.SnapshotInfo{Generation: pos.Generation, Index: pos.Index})
|
|
}
|
|
|
|
return nil
|
|
}(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Loop over generations and delete unretained snapshots & WAL files.
|
|
generations, err := r.Generations(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot obtain generations: %w", err)
|
|
}
|
|
for _, generation := range generations {
|
|
// Find earliest retained snapshot for this generation.
|
|
snapshot := litestream.FindMinSnapshotByGeneration(snapshots, generation)
|
|
|
|
// Delete generations if it has no snapshots being retained.
|
|
if snapshot == nil {
|
|
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
|
|
if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil {
|
|
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Otherwise delete all snapshots & WAL files before a lowest retained index.
|
|
if err := r.deleteGenerationBefore(ctx, generation, snapshot.Index); err != nil {
|
|
return fmt.Errorf("cannot delete generation %q files before index %d: %w", generation, snapshot.Index, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, index int) (err error) {
|
|
// Collect all files for the generation.
|
|
var objIDs []*s3.ObjectIdentifier
|
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Prefix: aws.String(r.GenerationDir(generation)),
|
|
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
|
|
r.listOperationTotalCounter.Inc()
|
|
|
|
for _, obj := range page.Contents {
|
|
// Skip snapshots or WALs that are after the search index unless -1.
|
|
if index != -1 {
|
|
if idx, _, err := litestream.ParseSnapshotPath(path.Base(*obj.Key)); err == nil && idx >= index {
|
|
continue
|
|
} else if idx, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)); err == nil && idx >= index {
|
|
continue
|
|
}
|
|
}
|
|
|
|
objIDs = append(objIDs, &s3.ObjectIdentifier{Key: obj.Key})
|
|
}
|
|
return true
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete all files in batches.
|
|
for i := 0; i < len(objIDs); i += MaxKeys {
|
|
j := i + MaxKeys
|
|
if j > len(objIDs) {
|
|
j = len(objIDs)
|
|
}
|
|
|
|
for _, objID := range objIDs[i:j] {
|
|
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
|
|
}
|
|
|
|
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
|
Bucket: aws.String(r.Bucket),
|
|
Delete: &s3.Delete{
|
|
Objects: objIDs[i:j],
|
|
Quiet: aws.Bool(true),
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
r.deleteOperationTotalCounter.Inc()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// S3 metrics.
|
|
var (
|
|
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "s3",
|
|
Name: "operation_total",
|
|
Help: "The number of S3 operations performed",
|
|
}, []string{"db", "name", "type"})
|
|
|
|
operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "s3",
|
|
Name: "operation_bytes",
|
|
Help: "The number of bytes used by S3 operations",
|
|
}, []string{"db", "name", "type"})
|
|
)
|