diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 5e6c3ec..5f7671c 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -243,6 +243,7 @@ type ReplicaConfig struct { Retention time.Duration `yaml:"retention"` RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` SyncInterval time.Duration `yaml:"sync-interval"` // s3 only + SnapshotInterval time.Duration `yaml:"snapshot-interval"` ValidationInterval time.Duration `yaml:"validation-interval"` // S3 settings @@ -304,6 +305,9 @@ func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestrea if v := c.RetentionCheckInterval; v > 0 { r.RetentionCheckInterval = v } + if v := c.SnapshotInterval; v > 0 { + r.SnapshotInterval = v + } if v := c.ValidationInterval; v > 0 { r.ValidationInterval = v } @@ -372,6 +376,9 @@ func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica, if v := c.SyncInterval; v > 0 { r.SyncInterval = v } + if v := c.SnapshotInterval; v > 0 { + r.SnapshotInterval = v + } if v := c.ValidationInterval; v > 0 { r.ValidationInterval = v } diff --git a/litestream.go b/litestream.go index edca37a..22d4cd1 100644 --- a/litestream.go +++ b/litestream.go @@ -36,6 +36,7 @@ const ( // Litestream errors. var ( + ErrNoGeneration = errors.New("no generation available") ErrNoSnapshots = errors.New("no snapshots available") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") ) diff --git a/replica.go b/replica.go index 6ae8f3b..1023eec 100644 --- a/replica.go +++ b/replica.go @@ -98,8 +98,11 @@ type FileReplica struct { walIndexGauge prometheus.Gauge walOffsetGauge prometheus.Gauge + // Frequency to create new snapshots. + SnapshotInterval time.Duration + // Time to keep snapshots and related WAL files. - // Database is snapshotted after interval and older WAL files are discarded. + // Database is snapshotted after interval, if needed, and older WAL files are discarded. Retention time.Duration // Time between checks for retention. @@ -402,9 +405,10 @@ func (r *FileReplica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutine to replicate data. - r.wg.Add(3) + r.wg.Add(4) go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }() + go func() { defer r.wg.Done(); r.snapshotter(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }() } @@ -446,7 +450,18 @@ func (r *FileReplica) monitor(ctx context.Context) { // retainer runs in a separate goroutine and handles retention. func (r *FileReplica) retainer(ctx context.Context) { - ticker := time.NewTicker(r.RetentionCheckInterval) + // Disable retention enforcement if retention period is non-positive. + if r.Retention <= 0 { + return + } + + // Ensure check interval is not longer than retention period. + checkInterval := r.RetentionCheckInterval + if checkInterval > r.Retention { + checkInterval = r.Retention + } + + ticker := time.NewTicker(checkInterval) defer ticker.Stop() for { @@ -462,6 +477,28 @@ func (r *FileReplica) retainer(ctx context.Context) { } } +// snapshotter runs in a separate goroutine and handles snapshotting. +func (r *FileReplica) snapshotter(ctx context.Context) { + if r.SnapshotInterval <= 0 { + return + } + + ticker := time.NewTicker(r.SnapshotInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration { + log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err) + continue + } + } + } +} + // validator runs in a separate goroutine and handles periodic validation. func (r *FileReplica) validator(ctx context.Context) { // Initialize counters since validation occurs infrequently. @@ -531,6 +568,18 @@ func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, return pos, nil } +// Snapshot copies the entire database to the replica path. +func (r *FileReplica) Snapshot(ctx context.Context) error { + // Find current position of database. + pos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current db generation: %w", err) + } else if pos.IsZero() { + return ErrNoGeneration + } + return r.snapshot(ctx, pos.Generation, pos.Index) +} + // snapshot copies the entire database to the replica path. func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error { // Acquire a read lock on the database during snapshot to prevent checkpoints. @@ -557,7 +606,7 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int return err } - log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) + log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) return nil } diff --git a/s3/s3.go b/s3/s3.go index e9eb01b..d1e3b4d 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -80,6 +80,9 @@ type Replica struct { // Time between syncs with the shadow WAL. SyncInterval time.Duration + // Frequency to create new snapshots. + SnapshotInterval time.Duration + // Time to keep snapshots and related WAL files. // Database is snapshotted after interval and older WAL files are discarded. Retention time.Duration @@ -427,9 +430,10 @@ func (r *Replica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutines to manage replica data. - r.wg.Add(3) + r.wg.Add(4) go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }() + go func() { defer r.wg.Done(); r.snapshotter(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }() } @@ -479,7 +483,18 @@ func (r *Replica) monitor(ctx context.Context) { // retainer runs in a separate goroutine and handles retention. func (r *Replica) retainer(ctx context.Context) { - ticker := time.NewTicker(r.RetentionCheckInterval) + // Disable retention enforcement if retention period is non-positive. + if r.Retention <= 0 { + return + } + + // Ensure check interval is not longer than retention period. + checkInterval := r.RetentionCheckInterval + if checkInterval > r.Retention { + checkInterval = r.Retention + } + + ticker := time.NewTicker(checkInterval) defer ticker.Stop() for { @@ -495,6 +510,28 @@ func (r *Replica) retainer(ctx context.Context) { } } +// snapshotter runs in a separate goroutine and handles snapshotting. +func (r *Replica) snapshotter(ctx context.Context) { + if r.SnapshotInterval <= 0 { + return + } + + ticker := time.NewTicker(r.SnapshotInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.Snapshot(ctx); err != nil && err != litestream.ErrNoGeneration { + log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err) + continue + } + } + } +} + // validator runs in a separate goroutine and handles periodic validation. func (r *Replica) validator(ctx context.Context) { // Initialize counters since validation occurs infrequently. @@ -572,6 +609,18 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea return pos, nil } +// Snapshot copies the entire database to the replica path. +func (r *Replica) Snapshot(ctx context.Context) error { + // Find current position of database. + pos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current db generation: %w", err) + } else if pos.IsZero() { + return litestream.ErrNoGeneration + } + return r.snapshot(ctx, pos.Generation, pos.Index) +} + // snapshot copies the entire database to the replica path. func (r *Replica) snapshot(ctx context.Context, generation string, index int) error { // Acquire a read lock on the database during snapshot to prevent checkpoints. @@ -620,7 +669,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er r.putOperationTotalCounter.Inc() r.putOperationBytesCounter.Add(float64(fi.Size())) - log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) + log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) return nil }