From afb8731eadae0a4204f909851f474ce67fac69d1 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 25 Feb 2021 15:34:13 -0700 Subject: [PATCH] Add snapshot interval This commit adds the ability to periodically perform snapshots on an interval that is separate from retention. For example, this lets you retain backups for 24 hours but you can snapshot your database every six hours to improve recovery time. --- cmd/litestream/main.go | 7 ++++++ litestream.go | 1 + replica.go | 57 +++++++++++++++++++++++++++++++++++++++--- s3/s3.go | 55 +++++++++++++++++++++++++++++++++++++--- 4 files changed, 113 insertions(+), 7 deletions(-) diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 5e6c3ec..5f7671c 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -243,6 +243,7 @@ type ReplicaConfig struct { Retention time.Duration `yaml:"retention"` RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` SyncInterval time.Duration `yaml:"sync-interval"` // s3 only + SnapshotInterval time.Duration `yaml:"snapshot-interval"` ValidationInterval time.Duration `yaml:"validation-interval"` // S3 settings @@ -304,6 +305,9 @@ func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestrea if v := c.RetentionCheckInterval; v > 0 { r.RetentionCheckInterval = v } + if v := c.SnapshotInterval; v > 0 { + r.SnapshotInterval = v + } if v := c.ValidationInterval; v > 0 { r.ValidationInterval = v } @@ -372,6 +376,9 @@ func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica, if v := c.SyncInterval; v > 0 { r.SyncInterval = v } + if v := c.SnapshotInterval; v > 0 { + r.SnapshotInterval = v + } if v := c.ValidationInterval; v > 0 { r.ValidationInterval = v } diff --git a/litestream.go b/litestream.go index edca37a..22d4cd1 100644 --- a/litestream.go +++ b/litestream.go @@ -36,6 +36,7 @@ const ( // Litestream errors. var ( + ErrNoGeneration = errors.New("no generation available") ErrNoSnapshots = errors.New("no snapshots available") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") ) diff --git a/replica.go b/replica.go index 6ae8f3b..1023eec 100644 --- a/replica.go +++ b/replica.go @@ -98,8 +98,11 @@ type FileReplica struct { walIndexGauge prometheus.Gauge walOffsetGauge prometheus.Gauge + // Frequency to create new snapshots. + SnapshotInterval time.Duration + // Time to keep snapshots and related WAL files. - // Database is snapshotted after interval and older WAL files are discarded. + // Database is snapshotted after interval, if needed, and older WAL files are discarded. Retention time.Duration // Time between checks for retention. @@ -402,9 +405,10 @@ func (r *FileReplica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutine to replicate data. - r.wg.Add(3) + r.wg.Add(4) go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }() + go func() { defer r.wg.Done(); r.snapshotter(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }() } @@ -446,7 +450,18 @@ func (r *FileReplica) monitor(ctx context.Context) { // retainer runs in a separate goroutine and handles retention. func (r *FileReplica) retainer(ctx context.Context) { - ticker := time.NewTicker(r.RetentionCheckInterval) + // Disable retention enforcement if retention period is non-positive. + if r.Retention <= 0 { + return + } + + // Ensure check interval is not longer than retention period. + checkInterval := r.RetentionCheckInterval + if checkInterval > r.Retention { + checkInterval = r.Retention + } + + ticker := time.NewTicker(checkInterval) defer ticker.Stop() for { @@ -462,6 +477,28 @@ func (r *FileReplica) retainer(ctx context.Context) { } } +// snapshotter runs in a separate goroutine and handles snapshotting. +func (r *FileReplica) snapshotter(ctx context.Context) { + if r.SnapshotInterval <= 0 { + return + } + + ticker := time.NewTicker(r.SnapshotInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration { + log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err) + continue + } + } + } +} + // validator runs in a separate goroutine and handles periodic validation. func (r *FileReplica) validator(ctx context.Context) { // Initialize counters since validation occurs infrequently. @@ -531,6 +568,18 @@ func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, return pos, nil } +// Snapshot copies the entire database to the replica path. +func (r *FileReplica) Snapshot(ctx context.Context) error { + // Find current position of database. + pos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current db generation: %w", err) + } else if pos.IsZero() { + return ErrNoGeneration + } + return r.snapshot(ctx, pos.Generation, pos.Index) +} + // snapshot copies the entire database to the replica path. func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error { // Acquire a read lock on the database during snapshot to prevent checkpoints. @@ -557,7 +606,7 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int return err } - log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) + log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) return nil } diff --git a/s3/s3.go b/s3/s3.go index e9eb01b..d1e3b4d 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -80,6 +80,9 @@ type Replica struct { // Time between syncs with the shadow WAL. SyncInterval time.Duration + // Frequency to create new snapshots. + SnapshotInterval time.Duration + // Time to keep snapshots and related WAL files. // Database is snapshotted after interval and older WAL files are discarded. Retention time.Duration @@ -427,9 +430,10 @@ func (r *Replica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutines to manage replica data. - r.wg.Add(3) + r.wg.Add(4) go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }() + go func() { defer r.wg.Done(); r.snapshotter(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }() } @@ -479,7 +483,18 @@ func (r *Replica) monitor(ctx context.Context) { // retainer runs in a separate goroutine and handles retention. func (r *Replica) retainer(ctx context.Context) { - ticker := time.NewTicker(r.RetentionCheckInterval) + // Disable retention enforcement if retention period is non-positive. + if r.Retention <= 0 { + return + } + + // Ensure check interval is not longer than retention period. + checkInterval := r.RetentionCheckInterval + if checkInterval > r.Retention { + checkInterval = r.Retention + } + + ticker := time.NewTicker(checkInterval) defer ticker.Stop() for { @@ -495,6 +510,28 @@ func (r *Replica) retainer(ctx context.Context) { } } +// snapshotter runs in a separate goroutine and handles snapshotting. +func (r *Replica) snapshotter(ctx context.Context) { + if r.SnapshotInterval <= 0 { + return + } + + ticker := time.NewTicker(r.SnapshotInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.Snapshot(ctx); err != nil && err != litestream.ErrNoGeneration { + log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err) + continue + } + } + } +} + // validator runs in a separate goroutine and handles periodic validation. func (r *Replica) validator(ctx context.Context) { // Initialize counters since validation occurs infrequently. @@ -572,6 +609,18 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea return pos, nil } +// Snapshot copies the entire database to the replica path. +func (r *Replica) Snapshot(ctx context.Context) error { + // Find current position of database. + pos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current db generation: %w", err) + } else if pos.IsZero() { + return litestream.ErrNoGeneration + } + return r.snapshot(ctx, pos.Generation, pos.Index) +} + // snapshot copies the entire database to the replica path. func (r *Replica) snapshot(ctx context.Context, generation string, index int) error { // Acquire a read lock on the database during snapshot to prevent checkpoints. @@ -620,7 +669,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er r.putOperationTotalCounter.Inc() r.putOperationBytesCounter.Add(float64(fi.Size())) - log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) + log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) return nil }