Files
litestream/s3/s3.go
Ben Johnson 3183cf0e2e Add support for Linode Object Storage replica URLs
This commit adds the ability to specify Linode Object Storage
as replica URLs in the command line and configuration file:

	s3://MYBKT.us-east-1.linodeobjects.com/MYPATH
2021-03-07 08:47:24 -07:00

1192 lines
32 KiB
Go

package s3
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path"
"regexp"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// S3 replica default settings.
const (
DefaultSyncInterval = 10 * time.Second
DefaultRetention = 24 * time.Hour
DefaultRetentionCheckInterval = 1 * time.Hour
)
// MaxKeys is the number of keys S3 can operate on per batch.
const MaxKeys = 1000
var _ litestream.Replica = (*Replica)(nil)
// Replica is a replica that replicates a DB to an S3 bucket.
type Replica struct {
db *litestream.DB // source database
name string // replica name, optional
s3 *s3.S3 // s3 service
uploader *s3manager.Uploader
mu sync.RWMutex
snapshotMu sync.Mutex
pos litestream.Pos // last position
muf sync.Mutex
f *os.File // long-lived read-only db file descriptor
wg sync.WaitGroup
cancel func()
snapshotTotalGauge prometheus.Gauge
walBytesCounter prometheus.Counter
walIndexGauge prometheus.Gauge
walOffsetGauge prometheus.Gauge
putOperationTotalCounter prometheus.Counter
putOperationBytesCounter prometheus.Counter
getOperationTotalCounter prometheus.Counter
getOperationBytesCounter prometheus.Counter
listOperationTotalCounter prometheus.Counter
deleteOperationTotalCounter prometheus.Counter
// AWS authentication keys.
AccessKeyID string
SecretAccessKey string
// S3 bucket information
Region string
Bucket string
Path string
Endpoint string
ForcePathStyle bool
// Time between syncs with the shadow WAL.
SyncInterval time.Duration
// Frequency to create new snapshots.
SnapshotInterval time.Duration
// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded.
Retention time.Duration
// Time between retention checks.
RetentionCheckInterval time.Duration
// Time between validation checks.
ValidationInterval time.Duration
// If true, replica monitors database for changes automatically.
// Set to false if replica is being used synchronously (such as in tests).
MonitorEnabled bool
}
// NewReplica returns a new instance of Replica.
func NewReplica(db *litestream.DB, name string) *Replica {
r := &Replica{
db: db,
name: name,
cancel: func() {},
SyncInterval: DefaultSyncInterval,
Retention: DefaultRetention,
RetentionCheckInterval: DefaultRetentionCheckInterval,
MonitorEnabled: true,
}
var dbPath string
if db != nil {
dbPath = db.Path()
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(dbPath, r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(dbPath, r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(dbPath, r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(dbPath, r.Name())
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "PUT")
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(dbPath, r.Name(), "GET")
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "LIST")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(dbPath, r.Name(), "DELETE")
return r
}
// Name returns the name of the replica. Returns the type if no name set.
func (r *Replica) Name() string {
if r.name != "" {
return r.name
}
return r.Type()
}
// Type returns the type of replica.
func (r *Replica) Type() string {
return "s3"
}
// DB returns the parent database reference.
func (r *Replica) DB() *litestream.DB {
return r.db
}
// LastPos returns the last successfully replicated position.
func (r *Replica) LastPos() litestream.Pos {
r.mu.RLock()
defer r.mu.RUnlock()
return r.pos
}
// GenerationDir returns the path to a generation's root directory.
func (r *Replica) GenerationDir(generation string) string {
return path.Join(r.Path, "generations", generation)
}
// SnapshotDir returns the path to a generation's snapshot directory.
func (r *Replica) SnapshotDir(generation string) string {
return path.Join(r.GenerationDir(generation), "snapshots")
}
// SnapshotPath returns the path to a snapshot file.
func (r *Replica) SnapshotPath(generation string, index int) string {
return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.lz4", index))
}
// MaxSnapshotIndex returns the highest index for the snapshots.
func (r *Replica) MaxSnapshotIndex(generation string) (int, error) {
snapshots, err := r.Snapshots(context.Background())
if err != nil {
return 0, err
}
index := -1
for _, snapshot := range snapshots {
if snapshot.Generation != generation {
continue
} else if index == -1 || snapshot.Index > index {
index = snapshot.Index
}
}
if index == -1 {
return 0, fmt.Errorf("no snapshots found")
}
return index, nil
}
// WALDir returns the path to a generation's WAL directory
func (r *Replica) WALDir(generation string) string {
return path.Join(r.GenerationDir(generation), "wal")
}
// Generations returns a list of available generation names.
func (r *Replica) Generations(ctx context.Context) ([]string, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
var generations []string
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.Path, "generations") + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, prefix := range page.CommonPrefixes {
name := path.Base(*prefix.Prefix)
if !litestream.IsGenerationName(name) {
continue
}
generations = append(generations, name)
}
return true
}); err != nil {
return nil, err
}
return generations, nil
}
// GenerationStats returns stats for a generation.
func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats litestream.GenerationStats, err error) {
if err := r.Init(ctx); err != nil {
return stats, err
}
// Determine stats for all snapshots.
n, min, max, err := r.snapshotStats(ctx, generation)
if err != nil {
return stats, err
}
stats.SnapshotN = n
stats.CreatedAt, stats.UpdatedAt = min, max
// Update stats if we have WAL files.
n, min, max, err = r.walStats(ctx, generation)
if err != nil {
return stats, err
} else if n == 0 {
return stats, nil
}
stats.WALN = n
if stats.CreatedAt.IsZero() || min.Before(stats.CreatedAt) {
stats.CreatedAt = min
}
if stats.UpdatedAt.IsZero() || max.After(stats.UpdatedAt) {
stats.UpdatedAt = max
}
return stats, nil
}
func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
if !litestream.IsSnapshotPath(path.Base(*obj.Key)) {
continue
}
modTime := obj.LastModified.UTC()
n++
if min.IsZero() || modTime.Before(min) {
min = modTime
}
if max.IsZero() || modTime.After(max) {
max = modTime
}
}
return true
}); err != nil {
return n, min, max, err
}
return n, min, max, nil
}
func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
if !litestream.IsWALPath(path.Base(*obj.Key)) {
continue
}
modTime := obj.LastModified.UTC()
n++
if min.IsZero() || modTime.Before(min) {
min = modTime
}
if max.IsZero() || modTime.After(max) {
max = modTime
}
}
return true
}); err != nil {
return n, min, max, err
}
return n, min, max, nil
}
// Snapshots returns a list of available snapshots in the replica.
func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
generations, err := r.Generations(ctx)
if err != nil {
return nil, err
}
var infos []*litestream.SnapshotInfo
for _, generation := range generations {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
index, _, err := litestream.ParseSnapshotPath(key)
if err != nil {
continue
}
infos = append(infos, &litestream.SnapshotInfo{
Name: key,
Replica: r.Name(),
Generation: generation,
Index: index,
Size: *obj.Size,
CreatedAt: obj.LastModified.UTC(),
})
}
return true
}); err != nil {
return nil, err
}
}
return infos, nil
}
// WALs returns a list of available WAL files in the replica.
func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
generations, err := r.Generations(ctx)
if err != nil {
return nil, err
}
var infos []*litestream.WALInfo
for _, generation := range generations {
var prev *litestream.WALInfo
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
index, offset, _, err := litestream.ParseWALPath(key)
if err != nil {
continue
}
// Update previous record if generation & index match.
if prev != nil && prev.Index == index {
prev.Size += *obj.Size
prev.CreatedAt = obj.LastModified.UTC()
continue
}
// Append new WAL record and keep reference to append additional
// size for segmented WAL files.
prev = &litestream.WALInfo{
Name: key,
Replica: r.Name(),
Generation: generation,
Index: index,
Offset: offset,
Size: *obj.Size,
CreatedAt: obj.LastModified.UTC(),
}
infos = append(infos, prev)
}
return true
}); err != nil {
return nil, err
}
}
return infos, nil
}
// Start starts replication for a given generation.
func (r *Replica) Start(ctx context.Context) (err error) {
// Ignore if replica is being used sychronously.
if !r.MonitorEnabled {
return nil
}
// Stop previous replication.
r.Stop(false)
// Wrap context with cancelation.
ctx, r.cancel = context.WithCancel(ctx)
// Start goroutines to manage replica data.
r.wg.Add(4)
go func() { defer r.wg.Done(); r.monitor(ctx) }()
go func() { defer r.wg.Done(); r.retainer(ctx) }()
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
go func() { defer r.wg.Done(); r.validator(ctx) }()
return nil
}
// Stop cancels any outstanding replication and blocks until finished.
//
// Performing a hard stop will close the DB file descriptor which could release
// locks on per-process locks. Hard stops should only be performed when
// stopping the entire process.
func (r *Replica) Stop(hard bool) (err error) {
r.cancel()
r.wg.Wait()
r.muf.Lock()
defer r.muf.Unlock()
if hard && r.f != nil {
if e := r.f.Close(); e != nil && err == nil {
err = e
}
}
return err
}
// monitor runs in a separate goroutine and continuously replicates the DB.
func (r *Replica) monitor(ctx context.Context) {
ticker := time.NewTicker(r.SyncInterval)
defer ticker.Stop()
// Continuously check for new data to replicate.
ch := make(chan struct{})
close(ch)
var notify <-chan struct{} = ch
for initial := true; ; initial = false {
// Enforce a minimum time between synchronization.
if !initial {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
// Wait for changes to the database.
select {
case <-ctx.Done():
return
case <-notify:
}
// Fetch new notify channel before replicating data.
notify = r.db.Notify()
// Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
// retainer runs in a separate goroutine and handles retention.
func (r *Replica) retainer(ctx context.Context) {
// Disable retention enforcement if retention period is non-positive.
if r.Retention <= 0 {
return
}
// Ensure check interval is not longer than retention period.
checkInterval := r.RetentionCheckInterval
if checkInterval > r.Retention {
checkInterval = r.Retention
}
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil {
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// snapshotter runs in a separate goroutine and handles snapshotting.
func (r *Replica) snapshotter(ctx context.Context) {
if r.SnapshotInterval <= 0 {
return
}
ticker := time.NewTicker(r.SnapshotInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.Snapshot(ctx); err != nil && err != litestream.ErrNoGeneration {
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// validator runs in a separate goroutine and handles periodic validation.
func (r *Replica) validator(ctx context.Context) {
// Initialize counters since validation occurs infrequently.
for _, status := range []string{"ok", "error"} {
internal.ReplicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), status).Add(0)
}
if r.ValidationInterval <= 0 {
return
}
ticker := time.NewTicker(r.ValidationInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := litestream.ValidateReplica(ctx, r); err != nil {
log.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// CalcPos returns the position for the replica for the current generation.
// Returns a zero value if there is no active generation.
func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestream.Pos, err error) {
if err := r.Init(ctx); err != nil {
return pos, err
}
pos.Generation = generation
// Find maximum snapshot index.
if pos.Index, err = r.MaxSnapshotIndex(generation); err != nil {
return litestream.Pos{}, err
}
index := -1
var offset int64
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
idx, off, _, err := litestream.ParseWALPath(key)
if err != nil {
continue // invalid wal filename
}
if index == -1 || idx > index {
index, offset = idx, 0 // start tracking new wal
} else if idx == index && off > offset {
offset = off // update offset
}
}
return true
}); err != nil {
return litestream.Pos{}, err
}
if index == -1 {
return pos, nil // no wal files
}
pos.Index = index
pos.Offset = offset
return pos, nil
}
// Snapshot copies the entire database to the replica path.
func (r *Replica) Snapshot(ctx context.Context) error {
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current db generation: %w", err)
} else if pos.IsZero() {
return litestream.ErrNoGeneration
}
return r.snapshot(ctx, pos.Generation, pos.Index)
}
// snapshot copies the entire database to the replica path.
func (r *Replica) snapshot(ctx context.Context, generation string, index int) error {
r.muf.Lock()
defer r.muf.Unlock()
// Acquire a read lock on the database during snapshot to prevent checkpoints.
tx, err := r.db.SQLDB().Begin()
if err != nil {
return err
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
_ = tx.Rollback()
return err
}
defer func() { _ = tx.Rollback() }()
// Open long-lived file descriptor on database.
if r.f == nil {
if r.f, err = os.Open(r.db.Path()); err != nil {
return err
}
}
// Move the file descriptor to the beginning. We only use one long lived
// file descriptor because some operating systems will remove the database
// lock when closing a separate file descriptor on the DB.
if _, err := r.f.Seek(0, io.SeekStart); err != nil {
return err
}
fi, err := r.f.Stat()
if err != nil {
return err
}
pr, pw := io.Pipe()
zw := lz4.NewWriter(pw)
go func() {
if _, err := io.Copy(zw, r.f); err != nil {
_ = pw.CloseWithError(err)
return
}
_ = pw.CloseWithError(zw.Close())
}()
snapshotPath := r.SnapshotPath(generation, index)
startTime := time.Now()
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(snapshotPath),
Body: pr,
}); err != nil {
return err
}
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size()))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
return nil
}
// snapshotN returns the number of snapshots for a generation.
func (r *Replica) snapshotN(generation string) (int, error) {
snapshots, err := r.Snapshots(context.Background())
if err != nil {
return 0, err
}
var n int
for _, snapshot := range snapshots {
if snapshot.Generation == generation {
n++
}
}
return n, nil
}
// Init initializes the connection to S3. No-op if already initialized.
func (r *Replica) Init(ctx context.Context) (err error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.s3 != nil {
return nil
}
// Look up region if not specified and no endpoint is used.
// Endpoints are typically used for non-S3 object stores and do not
// necessarily require a region.
region := r.Region
if region == "" && r.Endpoint == "" {
if region, err = r.findBucketRegion(ctx, r.Bucket); err != nil {
return fmt.Errorf("cannot lookup bucket region: %w", err)
}
}
// Create new AWS session.
config := r.config()
if region != "" {
config.Region = aws.String(region)
}
sess, err := session.NewSession(config)
if err != nil {
return fmt.Errorf("cannot create aws session: %w", err)
}
r.s3 = s3.New(sess)
r.uploader = s3manager.NewUploader(sess)
return nil
}
// config returns the AWS configuration. Uses the default credential chain
// unless a key/secret are explicitly set.
func (r *Replica) config() *aws.Config {
config := defaults.Get().Config
if r.AccessKeyID != "" || r.SecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "")
}
if r.Endpoint != "" {
config.Endpoint = aws.String(r.Endpoint)
}
if r.ForcePathStyle {
config.S3ForcePathStyle = aws.Bool(r.ForcePathStyle)
}
return config
}
func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, error) {
// Connect to US standard region to fetch info.
config := r.config()
config.Region = aws.String("us-east-1")
sess, err := session.NewSession(config)
if err != nil {
return "", err
}
// Fetch bucket location, if possible. Must be bucket owner.
// This call can return a nil location which means it's in us-east-1.
if out, err := s3.New(sess).GetBucketLocation(&s3.GetBucketLocationInput{
Bucket: aws.String(bucket),
}); err != nil {
return "", err
} else if out.LocationConstraint != nil {
return *out.LocationConstraint, nil
}
return "us-east-1", nil
}
// Sync replays data from the shadow WAL and uploads it to S3.
func (r *Replica) Sync(ctx context.Context) (err error) {
// Clear last position if if an error occurs during sync.
defer func() {
if err != nil {
r.mu.Lock()
r.pos = litestream.Pos{}
r.mu.Unlock()
}
}()
// Connect to S3, if necessary.
if err := r.Init(ctx); err != nil {
return err
}
// Find current position of database.
dpos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if dpos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
generation := dpos.Generation
// Calculate position if we don't have a previous position or if the generation changes.
// Ensure sync & retainer do not snapshot at the same time.
if lastPos := r.LastPos(); lastPos.IsZero() || lastPos.Generation != generation {
if err := func() error {
r.snapshotMu.Lock()
defer r.snapshotMu.Unlock()
// Create snapshot if no snapshots exist for generation.
if n, err := r.snapshotN(generation); err != nil {
return err
} else if n == 0 {
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
return err
}
r.snapshotTotalGauge.Set(1.0)
} else {
r.snapshotTotalGauge.Set(float64(n))
}
// Determine position, if necessary.
pos, err := r.CalcPos(ctx, generation)
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", err)
}
r.mu.Lock()
defer r.mu.Unlock()
r.pos = pos
return nil
}(); err != nil {
return err
}
}
// Read all WAL files since the last position.
for {
if err = r.syncWAL(ctx); err == io.EOF {
break
} else if err != nil {
return err
}
}
return nil
}
func (r *Replica) syncWAL(ctx context.Context) (err error) {
rd, err := r.db.ShadowWALReader(r.LastPos())
if err == io.EOF {
return err
} else if err != nil {
return fmt.Errorf("wal reader: %w", err)
}
defer rd.Close()
// Read to intermediate buffer to determine size.
pos := rd.Pos()
b, err := ioutil.ReadAll(rd)
if err != nil {
return err
}
var buf bytes.Buffer
zw := lz4.NewWriter(&buf)
n, err := zw.Write(b)
if err != nil {
return err
} else if err := zw.Close(); err != nil {
return err
}
// Build a WAL path with the index/offset as well as size so we can ensure
// that files are contiguous without having to decompress.
walPath := path.Join(
r.WALDir(rd.Pos().Generation),
litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".lz4",
)
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(walPath),
Body: &buf,
}); err != nil {
return err
}
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(n)) // compressed bytes
// Save last replicated position.
r.mu.Lock()
r.pos = rd.Pos()
r.mu.Unlock()
// Track raw bytes processed & current position.
r.walBytesCounter.Add(float64(len(b))) // raw bytes
r.walIndexGauge.Set(float64(rd.Pos().Index))
r.walOffsetGauge.Set(float64(rd.Pos().Offset))
return nil
}
// SnapshotReader returns a reader for snapshot data at the given generation/index.
func (r *Replica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
// Pipe download to return an io.Reader.
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(r.SnapshotPath(generation, index)),
})
if err != nil {
return nil, err
}
r.getOperationTotalCounter.Inc()
r.getOperationBytesCounter.Add(float64(*out.ContentLength))
// Decompress the snapshot file.
return internal.NewReadCloser(lz4.NewReader(out.Body), out.Body), nil
}
// WALReader returns a reader for WAL data at the given index.
// Returns os.ErrNotExist if no matching index is found.
func (r *Replica) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := r.Init(ctx); err != nil {
return nil, err
}
// Collect all files for the index.
var keys []string
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
_, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key))
if err != nil {
continue
}
keys = append(keys, *obj.Key)
}
return true
}); err != nil {
return nil, err
} else if len(keys) == 0 {
return nil, os.ErrNotExist
}
// Open each file and concatenate into a multi-reader.
var buf bytes.Buffer
var offset int64
for _, key := range keys {
// Ensure offset is correct as we copy segments into buffer.
_, off, _, _ := litestream.ParseWALPath(path.Base(key))
if off != offset {
return nil, fmt.Errorf("out of sequence wal segments: %s/%08x at remote offset %d, expected offset %d", generation, index, off, offset)
}
// Pipe download to return an io.Reader.
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer out.Body.Close()
r.getOperationTotalCounter.Inc()
r.getOperationBytesCounter.Add(float64(*out.ContentLength))
zr := lz4.NewReader(out.Body)
n, err := io.Copy(&buf, zr)
if err != nil {
return nil, err
}
offset += int64(n)
}
return ioutil.NopCloser(&buf), nil
}
// EnforceRetention forces a new snapshot once the retention interval has passed.
// Older snapshots and WAL files are then removed.
func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
if err := r.Init(ctx); err != nil {
return err
}
// Ensure sync & retainer do not snapshot at the same time.
var snapshots []*litestream.SnapshotInfo
if err := func() error {
r.snapshotMu.Lock()
defer r.snapshotMu.Unlock()
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if pos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
// Obtain list of snapshots that are within the retention period.
if snapshots, err = r.Snapshots(ctx); err != nil {
return fmt.Errorf("cannot obtain snapshot list: %w", err)
}
snapshots = litestream.FilterSnapshotsAfter(snapshots, time.Now().Add(-r.Retention))
// If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 {
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err)
}
snapshots = append(snapshots, &litestream.SnapshotInfo{Generation: pos.Generation, Index: pos.Index})
}
return nil
}(); err != nil {
return err
}
// Loop over generations and delete unretained snapshots & WAL files.
generations, err := r.Generations(ctx)
if err != nil {
return fmt.Errorf("cannot obtain generations: %w", err)
}
for _, generation := range generations {
// Find earliest retained snapshot for this generation.
snapshot := litestream.FindMinSnapshotByGeneration(snapshots, generation)
// Delete generations if it has no snapshots being retained.
if snapshot == nil {
if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
}
continue
}
// Otherwise delete all snapshots & WAL files before a lowest retained index.
if err := r.deleteGenerationBefore(ctx, generation, snapshot.Index); err != nil {
return fmt.Errorf("cannot delete generation %q files before index %d: %w", generation, snapshot.Index, err)
}
}
return nil
}
func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, index int) (err error) {
// Collect all files for the generation.
var objIDs []*s3.ObjectIdentifier
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.GenerationDir(generation)),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
// Skip snapshots or WALs that are after the search index unless -1.
if index != -1 {
if idx, _, err := litestream.ParseSnapshotPath(path.Base(*obj.Key)); err == nil && idx >= index {
continue
} else if idx, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)); err == nil && idx >= index {
continue
}
}
objIDs = append(objIDs, &s3.ObjectIdentifier{Key: obj.Key})
}
return true
}); err != nil {
return err
}
// Delete all files in batches.
var n int
for i := 0; i < len(objIDs); i += MaxKeys {
j := i + MaxKeys
if j > len(objIDs) {
j = len(objIDs)
}
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(r.Bucket),
Delete: &s3.Delete{
Objects: objIDs[i:j],
Quiet: aws.Bool(true),
},
}); err != nil {
return err
}
n += len(objIDs[i:j])
r.deleteOperationTotalCounter.Inc()
}
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
return nil
}
// ParseHost extracts data from a hostname depending on the service provider.
func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool) {
// Extract port if one is specified.
host, port, err := net.SplitHostPort(s)
if err != nil {
host = s
}
// Default to path-based URLs, except for with AWS S3 itself.
forcePathStyle = true
// Extract fields from provider-specific host formats.
scheme := "https"
if a := localhostRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], "us-east-1"
scheme, endpoint = "http", "localhost"
} else if a := gcsRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], "us-east-1"
endpoint = "storage.googleapis.com"
} else if a := digitalOceanRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], a[2]
endpoint = fmt.Sprintf("%s.digitaloceanspaces.com", region)
} else if a := linodeRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], a[2]
endpoint = fmt.Sprintf("%s.linodeobjects.com", region)
} else if a := backblazeRegex.FindStringSubmatch(host); a != nil {
bucket, region = a[1], a[2]
endpoint = fmt.Sprintf("s3.%s.backblazeb2.com", region)
} else {
bucket = host
forcePathStyle = false
}
// Add port back to endpoint, if available.
if endpoint != "" && port != "" {
endpoint = net.JoinHostPort(endpoint, port)
}
// Prepend scheme to endpoint.
if endpoint != "" {
endpoint = scheme + "://" + endpoint
}
return bucket, region, endpoint, forcePathStyle
}
var (
localhostRegex = regexp.MustCompile(`^(?:(.+)\.)?localhost$`)
digitalOceanRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.digitaloceanspaces.com$`)
linodeRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.linodeobjects.com$`)
backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`)
gcsRegex = regexp.MustCompile(`^(?:(.+)\.)?storage.googleapis.com$`)
)
// S3 metrics.
var (
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "s3",
Name: "operation_total",
Help: "The number of S3 operations performed",
}, []string{"db", "name", "type"})
operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "s3",
Name: "operation_bytes",
Help: "The number of bytes used by S3 operations",
}, []string{"db", "name", "type"})
)