Files
litestream/s3/s3.go
2021-01-15 13:31:04 -07:00

1029 lines
27 KiB
Go

package s3
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// S3 replica default settings.
const (
DefaultSyncInterval = 10 * time.Second
DefaultRetention = 24 * time.Hour
DefaultRetentionCheckInterval = 1 * time.Hour
)
// MaxKeys is the number of keys S3 can operate on per batch.
const MaxKeys = 1000
var _ litestream.Replica = (*Replica)(nil)
// Replica is a replica that replicates a DB to an S3 bucket.
type Replica struct {
db *litestream.DB // source database
name string // replica name, optional
s3 *s3.S3 // s3 service
uploader *s3manager.Uploader
mu sync.RWMutex
snapshotMu sync.Mutex
pos litestream.Pos // last position
wg sync.WaitGroup
cancel func()
snapshotTotalGauge prometheus.Gauge
walBytesCounter prometheus.Counter
walIndexGauge prometheus.Gauge
walOffsetGauge prometheus.Gauge
putOperationTotalCounter prometheus.Counter
putOperationBytesCounter prometheus.Counter
getOperationTotalCounter prometheus.Counter
getOperationBytesCounter prometheus.Counter
listOperationTotalCounter prometheus.Counter
deleteOperationTotalCounter prometheus.Counter
// AWS authentication keys.
AccessKeyID string
SecretAccessKey string
// S3 bucket information
Region string
Bucket string
Path string
// Time between syncs with the shadow WAL.
SyncInterval time.Duration
// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded.
Retention time.Duration
// Time between retention checks.
RetentionCheckInterval time.Duration
// If true, replica monitors database for changes automatically.
// Set to false if replica is being used synchronously (such as in tests).
MonitorEnabled bool
}
// NewReplica returns a new instance of Replica.
func NewReplica(db *litestream.DB, name string) *Replica {
r := &Replica{
db: db,
name: name,
cancel: func() {},
SyncInterval: DefaultSyncInterval,
Retention: DefaultRetention,
RetentionCheckInterval: DefaultRetentionCheckInterval,
MonitorEnabled: true,
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name())
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE")
return r
}
// Name returns the name of the replica. Returns the type if no name set.
func (r *Replica) Name() string {
if r.name != "" {
return r.name
}
return r.Type()
}
// Type returns the type of replica.
func (r *Replica) Type() string {
return "s3"
}
// LastPos returns the last successfully replicated position.
func (r *Replica) LastPos() litestream.Pos {
r.mu.RLock()
defer r.mu.RUnlock()
return r.pos
}
// GenerationDir returns the path to a generation's root directory.
func (r *Replica) GenerationDir(generation string) string {
return path.Join(r.Path, "generations", generation)
}
// SnapshotDir returns the path to a generation's snapshot directory.
func (r *Replica) SnapshotDir(generation string) string {
return path.Join(r.GenerationDir(generation), "snapshots")
}
// SnapshotPath returns the path to a snapshot file.
func (r *Replica) SnapshotPath(generation string, index int) string {
return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index))
}
// MaxSnapshotIndex returns the highest index for the snapshots.
func (r *Replica) MaxSnapshotIndex(generation string) (int, error) {
snapshots, err := r.Snapshots(context.Background())
if err != nil {
return 0, err
}
index := -1
for _, snapshot := range snapshots {
if snapshot.Generation != generation {
continue
} else if index == -1 || snapshot.Index > index {
index = snapshot.Index
}
}
if index == -1 {
return 0, fmt.Errorf("no snapshots found")
}
return index, nil
}
// WALDir returns the path to a generation's WAL directory
func (r *Replica) WALDir(generation string) string {
return path.Join(r.GenerationDir(generation), "wal")
}
// Generations returns a list of available generation names.
func (r *Replica) Generations(ctx context.Context) ([]string, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
var generations []string
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.Path, "generations") + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, prefix := range page.CommonPrefixes {
name := path.Base(*prefix.Prefix)
if !litestream.IsGenerationName(name) {
continue
}
generations = append(generations, name)
}
return true
}); err != nil {
return nil, err
}
return generations, nil
}
// GenerationStats returns stats for a generation.
func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats litestream.GenerationStats, err error) {
if err := r.Init(ctx); err != nil {
return stats, err
}
// Determine stats for all snapshots.
n, min, max, err := r.snapshotStats(ctx, generation)
if err != nil {
return stats, err
}
stats.SnapshotN = n
stats.CreatedAt, stats.UpdatedAt = min, max
// Update stats if we have WAL files.
n, min, max, err = r.walStats(ctx, generation)
if err != nil {
return stats, err
} else if n == 0 {
return stats, nil
}
stats.WALN = n
if stats.CreatedAt.IsZero() || min.Before(stats.CreatedAt) {
stats.CreatedAt = min
}
if stats.UpdatedAt.IsZero() || max.After(stats.UpdatedAt) {
stats.UpdatedAt = max
}
return stats, nil
}
func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
if !litestream.IsSnapshotPath(path.Base(*obj.Key)) {
continue
}
modTime := obj.LastModified.UTC()
n++
if min.IsZero() || modTime.Before(min) {
min = modTime
}
if max.IsZero() || modTime.After(max) {
max = modTime
}
}
return true
}); err != nil {
return n, min, max, err
}
return n, min, max, nil
}
func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
if !litestream.IsWALPath(path.Base(*obj.Key)) {
continue
}
modTime := obj.LastModified.UTC()
n++
if min.IsZero() || modTime.Before(min) {
min = modTime
}
if max.IsZero() || modTime.After(max) {
max = modTime
}
}
return true
}); err != nil {
return n, min, max, err
}
return n, min, max, nil
}
// Snapshots returns a list of available snapshots in the replica.
func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
generations, err := r.Generations(ctx)
if err != nil {
return nil, err
}
var infos []*litestream.SnapshotInfo
for _, generation := range generations {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
index, _, err := litestream.ParseSnapshotPath(key)
if err != nil {
continue
}
infos = append(infos, &litestream.SnapshotInfo{
Name: key,
Replica: r.Name(),
Generation: generation,
Index: index,
Size: *obj.Size,
CreatedAt: obj.LastModified.UTC(),
})
}
return true
}); err != nil {
return nil, err
}
}
return infos, nil
}
// WALs returns a list of available WAL files in the replica.
func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
generations, err := r.Generations(ctx)
if err != nil {
return nil, err
}
var infos []*litestream.WALInfo
for _, generation := range generations {
var prev *litestream.WALInfo
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
index, offset, _, _, err := litestream.ParseWALPath(key)
if err != nil {
continue
}
// Update previous record if generation & index match.
if prev != nil && prev.Index == index {
prev.Size += *obj.Size
prev.CreatedAt = obj.LastModified.UTC()
continue
}
// Append new WAL record and keep reference to append additional
// size for segmented WAL files.
prev = &litestream.WALInfo{
Name: key,
Replica: r.Name(),
Generation: generation,
Index: index,
Offset: offset,
Size: *obj.Size,
CreatedAt: obj.LastModified.UTC(),
}
infos = append(infos, prev)
}
return true
}); err != nil {
return nil, err
}
}
return infos, nil
}
// Start starts replication for a given generation.
func (r *Replica) Start(ctx context.Context) {
// Ignore if replica is being used sychronously.
if !r.MonitorEnabled {
return
}
// Stop previous replication.
r.Stop()
// Wrap context with cancelation.
ctx, r.cancel = context.WithCancel(ctx)
// Start goroutines to manage replica data.
r.wg.Add(2)
go func() { defer r.wg.Done(); r.monitor(ctx) }()
go func() { defer r.wg.Done(); r.retainer(ctx) }()
}
// Stop cancels any outstanding replication and blocks until finished.
func (r *Replica) Stop() {
r.cancel()
r.wg.Wait()
}
// monitor runs in a separate goroutine and continuously replicates the DB.
func (r *Replica) monitor(ctx context.Context) {
ticker := time.NewTicker(r.SyncInterval)
defer ticker.Stop()
// Continuously check for new data to replicate.
ch := make(chan struct{})
close(ch)
var notify <-chan struct{} = ch
for initial := true; ; initial = false {
// Enforce a minimum time between synchronization.
if !initial {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
// Wait for changes to the database.
select {
case <-ctx.Done():
return
case <-notify:
}
// Fetch new notify channel before replicating data.
notify = r.db.Notify()
// Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
// retainer runs in a separate goroutine and handles retention.
func (r *Replica) retainer(ctx context.Context) {
ticker := time.NewTicker(r.RetentionCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil {
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// CalcPos returns the position for the replica for the current generation.
// Returns a zero value if there is no active generation.
func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestream.Pos, err error) {
if err := r.Init(ctx); err != nil {
return pos, err
}
pos.Generation = generation
// Find maximum snapshot index.
if pos.Index, err = r.MaxSnapshotIndex(generation); err != nil {
return litestream.Pos{}, err
}
index := -1
var offset int64
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
idx, offset, sz, _, err := litestream.ParseWALPath(key)
if err != nil {
continue // invalid wal filename
}
if index == -1 || idx > index {
index, offset = idx, 0 // start tracking new wal
} else if idx == index {
offset += sz // append additional size to wal
}
}
return true
}); err != nil {
return litestream.Pos{}, err
}
if index == -1 {
return pos, nil // no wal files
}
pos.Index = index
pos.Offset = offset
return pos, nil
}
// snapshot copies the entire database to the replica path.
func (r *Replica) snapshot(ctx context.Context, generation string, index int) error {
// Acquire a read lock on the database during snapshot to prevent checkpoints.
tx, err := r.db.SQLDB().Begin()
if err != nil {
return err
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
tx.Rollback()
return err
}
defer tx.Rollback()
// Open database file handle.
f, err := os.Open(r.db.Path())
if err != nil {
return err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return err
}
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
go func() {
if _, err := io.Copy(gw, f); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.CloseWithError(gw.Close())
}()
snapshotPath := r.SnapshotPath(generation, index)
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(snapshotPath),
Body: pr,
}); err != nil {
return err
}
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size()))
return nil
}
// snapshotN returns the number of snapshots for a generation.
func (r *Replica) snapshotN(generation string) (int, error) {
snapshots, err := r.Snapshots(context.Background())
if err != nil {
return 0, err
}
var n int
for _, snapshot := range snapshots {
if snapshot.Generation == generation {
n++
}
}
return n, nil
}
// Init initializes the connection to S3. No-op if already initialized.
func (r *Replica) Init(ctx context.Context) (err error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.s3 != nil {
return nil
}
// Look up region if not specified.
region := r.Region
if region == "" {
if region, err = r.findBucketRegion(ctx, r.Bucket); err != nil {
return fmt.Errorf("cannot lookup bucket region: %w", err)
}
log.Printf("%s(%s): s3 bucket region found: %q", r.db.Path(), r.Name(), region)
}
// Create new AWS session.
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, ""),
Region: aws.String(region),
})
if err != nil {
return fmt.Errorf("cannot create aws session: %w", err)
}
r.s3 = s3.New(sess)
r.uploader = s3manager.NewUploader(sess)
return nil
}
func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, error) {
// Connect to US standard region to fetch info.
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, ""),
Region: aws.String("us-east-1"),
})
if err != nil {
return "", err
}
// Fetch bucket location, if possible. Must be bucket owner.
// This call can return a nil location which means it's in us-east-1.
if out, err := s3.New(sess).GetBucketLocation(&s3.GetBucketLocationInput{
Bucket: aws.String(bucket),
}); err != nil {
return "", err
} else if out.LocationConstraint != nil {
return *out.LocationConstraint, nil
}
return "us-east-1", nil
}
func (r *Replica) Sync(ctx context.Context) (err error) {
if err := r.Init(ctx); err != nil {
return err
}
// Find current position of database.
dpos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if dpos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
generation := dpos.Generation
// Calculate position if we don't have a previous position or if the generation changes.
// Ensure sync & retainer do not snapshot at the same time.
if lastPos := r.LastPos(); lastPos.IsZero() || lastPos.Generation != generation {
if err := func() error {
r.snapshotMu.Lock()
defer r.snapshotMu.Unlock()
// Create snapshot if no snapshots exist for generation.
if n, err := r.snapshotN(generation); err != nil {
return err
} else if n == 0 {
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
return err
}
r.snapshotTotalGauge.Set(1.0)
} else {
r.snapshotTotalGauge.Set(float64(n))
}
// Determine position, if necessary.
pos, err := r.CalcPos(ctx, generation)
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", err)
}
r.mu.Lock()
defer r.mu.Unlock()
r.pos = pos
return nil
}(); err != nil {
return err
}
}
// Read all WAL files since the last position.
for {
if err = r.syncWAL(ctx); err == io.EOF {
break
} else if err != nil {
return err
}
}
return nil
}
func (r *Replica) syncWAL(ctx context.Context) (err error) {
rd, err := r.db.ShadowWALReader(r.LastPos())
if err == io.EOF {
return err
} else if err != nil {
return fmt.Errorf("wal reader: %w", err)
}
defer rd.Close()
// Read to intermediate buffer to determine size.
pos := rd.Pos()
b, err := ioutil.ReadAll(rd)
if err != nil {
return err
}
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
n, err := gw.Write(b)
if err != nil {
return err
} else if err := gw.Close(); err != nil {
return err
}
// Build a WAL path with the index/offset as well as size so we can ensure
// that files are contiguous without having to decompress.
walPath := path.Join(
r.WALDir(rd.Pos().Generation),
litestream.FormatWALPathWithOffsetSize(pos.Index, pos.Offset, int64(len(b)))+".gz",
)
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(walPath),
Body: &buf,
}); err != nil {
return err
}
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(n)) // compressed bytes
// Save last replicated position.
r.mu.Lock()
r.pos = rd.Pos()
r.mu.Unlock()
// Track raw bytes processed & current position.
r.walBytesCounter.Add(float64(len(b))) // raw bytes
r.walIndexGauge.Set(float64(rd.Pos().Index))
r.walOffsetGauge.Set(float64(rd.Pos().Offset))
return nil
}
// SnapshotReader returns a reader for snapshot data at the given generation/index.
func (r *Replica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
// Pipe download to return an io.Reader.
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(r.SnapshotPath(generation, index)),
})
if err != nil {
return nil, err
}
r.getOperationTotalCounter.Inc()
r.getOperationTotalCounter.Add(float64(*out.ContentLength))
// Decompress the snapshot file.
gr, err := gzip.NewReader(out.Body)
if err != nil {
out.Body.Close()
return nil, err
}
return internal.NewReadCloser(gr, out.Body), nil
}
// WALReader returns a reader for WAL data at the given index.
// Returns os.ErrNotExist if no matching index is found.
func (r *Replica) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
// Collect all files for the index.
var keys []string
var offset int64
var innerErr error
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
// Read the offset & size from the filename. We need to check this
// against a running offset to ensure there are no gaps.
_, off, sz, _, err := litestream.ParseWALPath(path.Base(*obj.Key))
if err != nil {
continue
} else if off != offset {
innerErr = fmt.Errorf("out of sequence wal segments: %s/%08x", generation, index)
return false
}
keys = append(keys, *obj.Key)
offset += sz
}
return true
}); err != nil {
return nil, err
} else if innerErr != nil {
return nil, innerErr
}
// Open each file and concatenate into a multi-reader.
var buf bytes.Buffer
for _, key := range keys {
// Pipe download to return an io.Reader.
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer out.Body.Close()
r.getOperationTotalCounter.Inc()
r.getOperationTotalCounter.Add(float64(*out.ContentLength))
gr, err := gzip.NewReader(out.Body)
if err != nil {
return nil, err
}
defer gr.Close()
if _, err := io.Copy(&buf, gr); err != nil {
return nil, err
}
}
return ioutil.NopCloser(&buf), nil
}
// EnforceRetention forces a new snapshot once the retention interval has passed.
// Older snapshots and WAL files are then removed.
func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
if err := r.Init(ctx); err != nil {
return err
}
// Ensure sync & retainer do not snapshot at the same time.
var snapshots []*litestream.SnapshotInfo
if err := func() error {
r.snapshotMu.Lock()
defer r.snapshotMu.Unlock()
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if pos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
// Obtain list of snapshots that are within the retention period.
if snapshots, err = r.Snapshots(ctx); err != nil {
return fmt.Errorf("cannot obtain snapshot list: %w", err)
}
snapshots = litestream.FilterSnapshotsAfter(snapshots, time.Now().Add(-r.Retention))
// If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err)
}
snapshots = append(snapshots, &litestream.SnapshotInfo{Generation: pos.Generation, Index: pos.Index})
}
return nil
}(); err != nil {
return err
}
// Loop over generations and delete unretained snapshots & WAL files.
generations, err := r.Generations(ctx)
if err != nil {
return fmt.Errorf("cannot obtain generations: %w", err)
}
for _, generation := range generations {
// Find earliest retained snapshot for this generation.
snapshot := litestream.FindMinSnapshotByGeneration(snapshots, generation)
// Delete generations if it has no snapshots being retained.
if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
}
continue
}
// Otherwise delete all snapshots & WAL files before a lowest retained index.
if err := r.deleteGenerationBefore(ctx, generation, snapshot.Index); err != nil {
return fmt.Errorf("cannot delete generation %q files before index %d: %w", generation, snapshot.Index, err)
}
}
return nil
}
func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, index int) (err error) {
// Collect all files for the generation.
var objIDs []*s3.ObjectIdentifier
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.GenerationDir(generation)),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
// Skip snapshots or WALs that are after the search index unless -1.
if index != -1 {
if idx, _, err := litestream.ParseSnapshotPath(path.Base(*obj.Key)); err == nil && idx >= index {
continue
} else if idx, _, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)); err == nil && idx >= index {
continue
}
}
objIDs = append(objIDs, &s3.ObjectIdentifier{Key: obj.Key})
}
return true
}); err != nil {
return err
}
// Delete all files in batches.
for i := 0; i < len(objIDs); i += MaxKeys {
j := i + MaxKeys
if j > len(objIDs) {
j = len(objIDs)
}
for _, objID := range objIDs[i:j] {
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
}
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(r.Bucket),
Delete: &s3.Delete{
Objects: objIDs[i:j],
Quiet: aws.Bool(true),
},
}); err != nil {
return err
}
r.deleteOperationTotalCounter.Inc()
}
return nil
}
type multiReadCloser struct {
readers []io.ReadCloser
}
func (mr *multiReadCloser) Read(p []byte) (n int, err error) {
for len(mr.readers) > 0 {
n, err = mr.readers[0].Read(p)
if err == io.EOF {
if e := mr.readers[0].Close(); e != nil {
return n, e
}
mr.readers[0] = nil
mr.readers = mr.readers[1:]
}
if n > 0 || err != io.EOF {
if err == io.EOF && len(mr.readers) > 0 {
err = nil
}
return
}
}
return 0, io.EOF
}
func (mr *multiReadCloser) Close() (err error) {
for _, r := range mr.readers {
if e := r.Close(); e != nil && err == nil {
err = e
}
}
return err
}
// S3 metrics.
var (
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "s3",
Name: "operation_total",
Help: "The number of S3 operations performed",
}, []string{"db", "name", "type"})
operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "s3",
Name: "operation_bytes",
Help: "The number of bytes used by S3 operations",
}, []string{"db", "name", "type"})
)