Add s3 sync interval

This commit is contained in:
Ben Johnson
2021-01-14 15:04:26 -07:00
parent 294846cce2
commit 1e4e9633cc
4 changed files with 139 additions and 73 deletions

164
s3/s3.go
View File

@@ -22,8 +22,13 @@ import (
"github.com/benbjohnson/litestream/internal"
)
// S3 replica default settings.
const (
DefaultRetentionInterval = 1 * time.Hour
DefaultSyncInterval = 10 * time.Second
DefaultRetention = 24 * time.Hour
DefaultRetentionCheckInterval = 1 * time.Hour
)
// MaxKeys is the number of keys S3 can operate on per batch.
@@ -38,8 +43,9 @@ type Replica struct {
s3 *s3.S3 // s3 service
uploader *s3manager.Uploader
mu sync.RWMutex
pos litestream.Pos // last position
mu sync.RWMutex
snapshotMu sync.Mutex
pos litestream.Pos // last position
wg sync.WaitGroup
cancel func()
@@ -53,9 +59,15 @@ type Replica struct {
Bucket string
Path string
// Time between syncs with the shadow WAL.
SyncInterval time.Duration
// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded.
RetentionInterval time.Duration
Retention time.Duration
// Time between retention checks.
RetentionCheckInterval time.Duration
// If true, replica monitors database for changes automatically.
// Set to false if replica is being used synchronously (such as in tests).
@@ -69,8 +81,11 @@ func NewReplica(db *litestream.DB, name string) *Replica {
name: name,
cancel: func() {},
RetentionInterval: DefaultRetentionInterval,
MonitorEnabled: true,
SyncInterval: DefaultSyncInterval,
Retention: DefaultRetention,
RetentionCheckInterval: DefaultRetentionCheckInterval,
MonitorEnabled: true,
}
}
@@ -374,12 +389,25 @@ func (r *Replica) Stop() {
// monitor runs in a separate goroutine and continuously replicates the DB.
func (r *Replica) monitor(ctx context.Context) {
ticker := time.NewTicker(r.SyncInterval)
defer ticker.Stop()
// Continuously check for new data to replicate.
ch := make(chan struct{})
close(ch)
var notify <-chan struct{} = ch
for {
for initial := true; ; initial = false {
// Enforce a minimum time between synchronization.
if !initial {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
// Wait for changes to the database.
select {
case <-ctx.Done():
return
@@ -399,7 +427,7 @@ func (r *Replica) monitor(ctx context.Context) {
// retainer runs in a separate goroutine and handles retention.
func (r *Replica) retainer(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
ticker := time.NewTicker(r.RetentionCheckInterval)
defer ticker.Stop()
for {
@@ -417,8 +445,8 @@ func (r *Replica) retainer(ctx context.Context) {
// CalcPos returns the position for the replica for the current generation.
// Returns a zero value if there is no active generation.
func (r *Replica) CalcPos(generation string) (pos litestream.Pos, err error) {
if err := r.Init(context.Background()); err != nil {
func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestream.Pos, err error) {
if err := r.Init(ctx); err != nil {
return pos, err
}
@@ -483,7 +511,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
defer f.Close()
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed)
go func() {
if _, err := io.Copy(gw, f); err != nil {
_ = pw.CloseWithError(err)
@@ -494,7 +522,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
snapshotPath := r.SnapshotPath(generation, index)
if _, err := r.uploader.Upload(&s3manager.UploadInput{
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(snapshotPath),
Body: pr,
@@ -547,34 +575,44 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
return err
}
// Find current position of database.
dpos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if dpos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
generation := dpos.Generation
// Ensure sync & retainer do not calculate position or snapshot at the same time.
if err := func() error {
r.snapshotMu.Lock()
defer r.snapshotMu.Unlock()
// Create snapshot if no snapshots exist for generation.
if n, err := r.snapshotN(generation); err != nil {
return err
} else if n == 0 {
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
return err
}
}
// Determine position, if necessary.
if r.LastPos().IsZero() {
pos, err := r.CalcPos(generation)
// Find current position of database.
dpos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", err)
return fmt.Errorf("cannot determine current generation: %w", err)
} else if dpos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
generation := dpos.Generation
// Create snapshot if no snapshots exist for generation.
if n, err := r.snapshotN(generation); err != nil {
return err
} else if n == 0 {
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
return err
}
}
r.mu.Lock()
r.pos = pos
r.mu.Unlock()
// Determine position, if necessary.
if r.LastPos().IsZero() {
pos, err := r.CalcPos(ctx, generation)
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", err)
}
r.mu.Lock()
r.pos = pos
r.mu.Unlock()
}
return nil
}(); err != nil {
return err
}
// Read all WAL files since the last position.
@@ -605,7 +643,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
}
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
if _, err := gw.Write(b); err != nil {
return err
}
@@ -617,7 +655,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
litestream.FormatWALPathWithOffsetSize(rd.Pos().Index, rd.Pos().Offset, int64(len(b)))+".gz",
)
if _, err := r.uploader.Upload(&s3manager.UploadInput{
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(walPath),
Body: &buf,
@@ -727,28 +765,38 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
return err
}
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if pos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
// Ensure sync & retainer do not snapshot at the same time.
var snapshots []*litestream.SnapshotInfo
if err := func() error {
r.snapshotMu.Lock()
defer r.snapshotMu.Unlock()
// Obtain list of snapshots that are within the retention period.
snapshots, err := r.Snapshots(ctx)
if err != nil {
return fmt.Errorf("cannot obtain snapshot list: %w", err)
}
snapshots = litestream.FilterSnapshotsAfter(snapshots, time.Now().Add(-r.RetentionInterval))
// If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err)
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if pos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
snapshots = append(snapshots, &litestream.SnapshotInfo{Generation: pos.Generation, Index: pos.Index})
// Obtain list of snapshots that are within the retention period.
if snapshots, err = r.Snapshots(ctx); err != nil {
return fmt.Errorf("cannot obtain snapshot list: %w", err)
}
snapshots = litestream.FilterSnapshotsAfter(snapshots, time.Now().Add(-r.Retention))
// If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err)
}
snapshots = append(snapshots, &litestream.SnapshotInfo{Generation: pos.Generation, Index: pos.Index})
}
return nil
}(); err != nil {
return err
}
// Loop over generations and delete unretained snapshots & WAL files.