diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 839d76c..aa35740 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -154,10 +154,12 @@ type DBConfig struct { } type ReplicaConfig struct { - Type string `yaml:"type"` // "file", "s3" - Name string `yaml:"name"` // name of replica, optional. - Path string `yaml:"path"` - Retention time.Duration `yaml:"retention"` + Type string `yaml:"type"` // "file", "s3" + Name string `yaml:"name"` // name of replica, optional. + Path string `yaml:"path"` + Retention time.Duration `yaml:"retention"` + RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` + SyncInterval time.Duration `yaml:"sync-interval"` // s3 only // S3 settings AccessKeyID string `yaml:"access-key-id"` @@ -215,7 +217,10 @@ func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litest r := litestream.NewFileReplica(db, config.Name, config.Path) if v := config.Retention; v > 0 { - r.RetentionInterval = v + r.Retention = v + } + if v := config.RetentionCheckInterval; v > 0 { + r.RetentionCheckInterval = v } return r, nil } @@ -240,7 +245,13 @@ func newS3ReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*s3.Repli r.Path = config.Path if v := config.Retention; v > 0 { - r.RetentionInterval = v + r.Retention = v + } + if v := config.RetentionCheckInterval; v > 0 { + r.RetentionCheckInterval = v + } + if v := config.SyncInterval; v > 0 { + r.SyncInterval = v } return r, nil } diff --git a/db.go b/db.go index 81da17a..d84b899 100644 --- a/db.go +++ b/db.go @@ -31,7 +31,6 @@ const ( DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 - DefaultRetentionInterval = 24 * time.Hour ) // DB represents a managed instance of a SQLite database in the file system. @@ -1571,7 +1570,7 @@ func (db *DB) waitForReplica(ctx context.Context, r Replica, pos Pos, logger *lo } // Obtain current position of replica, check if past target position. - curr, err := r.CalcPos(pos.Generation) + curr, err := r.CalcPos(ctx, pos.Generation) if err != nil { logger.Printf("cannot obtain replica position: %s", err) continue diff --git a/replica.go b/replica.go index e50c691..87f8758 100644 --- a/replica.go +++ b/replica.go @@ -68,6 +68,11 @@ type GenerationStats struct { UpdatedAt time.Time } +// Default file replica settings. +const ( + DefaultRetention = 24 * time.Hour +) + var _ Replica = (*FileReplica)(nil) // FileReplica is a replica that replicates a DB to a local file path. @@ -85,7 +90,10 @@ type FileReplica struct { // Time to keep snapshots and related WAL files. // Database is snapshotted after interval and older WAL files are discarded. - RetentionInterval time.Duration + Retention time.Duration + + // Time between checks for retention. + RetentionCheckInterval time.Duration // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). @@ -100,8 +108,8 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica { dst: dst, cancel: func() {}, - RetentionInterval: DefaultRetentionInterval, - MonitorEnabled: true, + Retention: DefaultRetention, + MonitorEnabled: true, } } @@ -404,7 +412,7 @@ func (r *FileReplica) monitor(ctx context.Context) { // retainer runs in a separate goroutine and handles retention. func (r *FileReplica) retainer(ctx context.Context) { - ticker := time.NewTicker(1 * time.Minute) + ticker := time.NewTicker(r.RetentionCheckInterval) defer ticker.Stop() for { @@ -525,7 +533,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { // Determine position, if necessary. if r.LastPos().IsZero() { - pos, err := r.CalcPos(generation) + pos, err := r.CalcPos(ctx, generation) if err != nil { return fmt.Errorf("cannot determine replica position: %s", err) } @@ -709,7 +717,7 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) { if err != nil { return fmt.Errorf("cannot obtain snapshot list: %w", err) } - snapshots = FilterSnapshotsAfter(snapshots, time.Now().Add(-r.RetentionInterval)) + snapshots = FilterSnapshotsAfter(snapshots, time.Now().Add(-r.Retention)) // If no retained snapshots exist, create a new snapshot. if len(snapshots) == 0 { @@ -876,7 +884,7 @@ func compressFile(src, dst string, uid, gid int) error { } defer w.Close() - gz := gzip.NewWriter(w) + gz, _ := gzip.NewWriterLevel(w, gzip.BestSpeed) defer gz.Close() // Copy & compress file contents to temporary file. diff --git a/s3/s3.go b/s3/s3.go index fea4bf8..f866eca 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -22,8 +22,13 @@ import ( "github.com/benbjohnson/litestream/internal" ) +// S3 replica default settings. const ( - DefaultRetentionInterval = 1 * time.Hour + DefaultSyncInterval = 10 * time.Second + + DefaultRetention = 24 * time.Hour + + DefaultRetentionCheckInterval = 1 * time.Hour ) // MaxKeys is the number of keys S3 can operate on per batch. @@ -38,8 +43,9 @@ type Replica struct { s3 *s3.S3 // s3 service uploader *s3manager.Uploader - mu sync.RWMutex - pos litestream.Pos // last position + mu sync.RWMutex + snapshotMu sync.Mutex + pos litestream.Pos // last position wg sync.WaitGroup cancel func() @@ -53,9 +59,15 @@ type Replica struct { Bucket string Path string + // Time between syncs with the shadow WAL. + SyncInterval time.Duration + // Time to keep snapshots and related WAL files. // Database is snapshotted after interval and older WAL files are discarded. - RetentionInterval time.Duration + Retention time.Duration + + // Time between retention checks. + RetentionCheckInterval time.Duration // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). @@ -69,8 +81,11 @@ func NewReplica(db *litestream.DB, name string) *Replica { name: name, cancel: func() {}, - RetentionInterval: DefaultRetentionInterval, - MonitorEnabled: true, + SyncInterval: DefaultSyncInterval, + Retention: DefaultRetention, + RetentionCheckInterval: DefaultRetentionCheckInterval, + + MonitorEnabled: true, } } @@ -374,12 +389,25 @@ func (r *Replica) Stop() { // monitor runs in a separate goroutine and continuously replicates the DB. func (r *Replica) monitor(ctx context.Context) { + ticker := time.NewTicker(r.SyncInterval) + defer ticker.Stop() + // Continuously check for new data to replicate. ch := make(chan struct{}) close(ch) var notify <-chan struct{} = ch - for { + for initial := true; ; initial = false { + // Enforce a minimum time between synchronization. + if !initial { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } + + // Wait for changes to the database. select { case <-ctx.Done(): return @@ -399,7 +427,7 @@ func (r *Replica) monitor(ctx context.Context) { // retainer runs in a separate goroutine and handles retention. func (r *Replica) retainer(ctx context.Context) { - ticker := time.NewTicker(1 * time.Minute) + ticker := time.NewTicker(r.RetentionCheckInterval) defer ticker.Stop() for { @@ -417,8 +445,8 @@ func (r *Replica) retainer(ctx context.Context) { // CalcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. -func (r *Replica) CalcPos(generation string) (pos litestream.Pos, err error) { - if err := r.Init(context.Background()); err != nil { +func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestream.Pos, err error) { + if err := r.Init(ctx); err != nil { return pos, err } @@ -483,7 +511,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er defer f.Close() pr, pw := io.Pipe() - gw := gzip.NewWriter(pw) + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) go func() { if _, err := io.Copy(gw, f); err != nil { _ = pw.CloseWithError(err) @@ -494,7 +522,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er snapshotPath := r.SnapshotPath(generation, index) - if _, err := r.uploader.Upload(&s3manager.UploadInput{ + if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ Bucket: aws.String(r.Bucket), Key: aws.String(snapshotPath), Body: pr, @@ -547,34 +575,44 @@ func (r *Replica) Sync(ctx context.Context) (err error) { return err } - // Find current position of database. - dpos, err := r.db.Pos() - if err != nil { - return fmt.Errorf("cannot determine current generation: %w", err) - } else if dpos.IsZero() { - return fmt.Errorf("no generation, waiting for data") - } - generation := dpos.Generation + // Ensure sync & retainer do not calculate position or snapshot at the same time. + if err := func() error { + r.snapshotMu.Lock() + defer r.snapshotMu.Unlock() - // Create snapshot if no snapshots exist for generation. - if n, err := r.snapshotN(generation); err != nil { - return err - } else if n == 0 { - if err := r.snapshot(ctx, generation, dpos.Index); err != nil { - return err - } - } - - // Determine position, if necessary. - if r.LastPos().IsZero() { - pos, err := r.CalcPos(generation) + // Find current position of database. + dpos, err := r.db.Pos() if err != nil { - return fmt.Errorf("cannot determine replica position: %s", err) + return fmt.Errorf("cannot determine current generation: %w", err) + } else if dpos.IsZero() { + return fmt.Errorf("no generation, waiting for data") + } + generation := dpos.Generation + + // Create snapshot if no snapshots exist for generation. + if n, err := r.snapshotN(generation); err != nil { + return err + } else if n == 0 { + if err := r.snapshot(ctx, generation, dpos.Index); err != nil { + return err + } } - r.mu.Lock() - r.pos = pos - r.mu.Unlock() + // Determine position, if necessary. + if r.LastPos().IsZero() { + pos, err := r.CalcPos(ctx, generation) + if err != nil { + return fmt.Errorf("cannot determine replica position: %s", err) + } + + r.mu.Lock() + r.pos = pos + r.mu.Unlock() + } + + return nil + }(); err != nil { + return err } // Read all WAL files since the last position. @@ -605,7 +643,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { } var buf bytes.Buffer - gw := gzip.NewWriter(&buf) + gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) if _, err := gw.Write(b); err != nil { return err } @@ -617,7 +655,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { litestream.FormatWALPathWithOffsetSize(rd.Pos().Index, rd.Pos().Offset, int64(len(b)))+".gz", ) - if _, err := r.uploader.Upload(&s3manager.UploadInput{ + if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ Bucket: aws.String(r.Bucket), Key: aws.String(walPath), Body: &buf, @@ -727,28 +765,38 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) { return err } - // Find current position of database. - pos, err := r.db.Pos() - if err != nil { - return fmt.Errorf("cannot determine current generation: %w", err) - } else if pos.IsZero() { - return fmt.Errorf("no generation, waiting for data") - } + // Ensure sync & retainer do not snapshot at the same time. + var snapshots []*litestream.SnapshotInfo + if err := func() error { + r.snapshotMu.Lock() + defer r.snapshotMu.Unlock() - // Obtain list of snapshots that are within the retention period. - snapshots, err := r.Snapshots(ctx) - if err != nil { - return fmt.Errorf("cannot obtain snapshot list: %w", err) - } - snapshots = litestream.FilterSnapshotsAfter(snapshots, time.Now().Add(-r.RetentionInterval)) - - // If no retained snapshots exist, create a new snapshot. - if len(snapshots) == 0 { - log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name()) - if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { - return fmt.Errorf("cannot snapshot: %w", err) + // Find current position of database. + pos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current generation: %w", err) + } else if pos.IsZero() { + return fmt.Errorf("no generation, waiting for data") } - snapshots = append(snapshots, &litestream.SnapshotInfo{Generation: pos.Generation, Index: pos.Index}) + + // Obtain list of snapshots that are within the retention period. + if snapshots, err = r.Snapshots(ctx); err != nil { + return fmt.Errorf("cannot obtain snapshot list: %w", err) + } + snapshots = litestream.FilterSnapshotsAfter(snapshots, time.Now().Add(-r.Retention)) + + // If no retained snapshots exist, create a new snapshot. + if len(snapshots) == 0 { + log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name()) + if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { + return fmt.Errorf("cannot snapshot: %w", err) + } + snapshots = append(snapshots, &litestream.SnapshotInfo{Generation: pos.Generation, Index: pos.Index}) + } + + return nil + }(); err != nil { + return err } // Loop over generations and delete unretained snapshots & WAL files.