Add snapshot interval

This commit adds the ability to periodically perform snapshots on
an interval that is separate from retention. For example, this lets
you retain backups for 24 hours but you can snapshot your database
every six hours to improve recovery time.
This commit is contained in:
Ben Johnson
2021-02-25 15:34:13 -07:00
parent ce2d54cc20
commit afb8731ead
4 changed files with 113 additions and 7 deletions

View File

@@ -243,6 +243,7 @@ type ReplicaConfig struct {
Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
SnapshotInterval time.Duration `yaml:"snapshot-interval"`
ValidationInterval time.Duration `yaml:"validation-interval"`
// S3 settings
@@ -304,6 +305,9 @@ func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestrea
if v := c.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v
}
if v := c.SnapshotInterval; v > 0 {
r.SnapshotInterval = v
}
if v := c.ValidationInterval; v > 0 {
r.ValidationInterval = v
}
@@ -372,6 +376,9 @@ func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica,
if v := c.SyncInterval; v > 0 {
r.SyncInterval = v
}
if v := c.SnapshotInterval; v > 0 {
r.SnapshotInterval = v
}
if v := c.ValidationInterval; v > 0 {
r.ValidationInterval = v
}

View File

@@ -36,6 +36,7 @@ const (
// Litestream errors.
var (
ErrNoGeneration = errors.New("no generation available")
ErrNoSnapshots = errors.New("no snapshots available")
ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
)

View File

@@ -98,8 +98,11 @@ type FileReplica struct {
walIndexGauge prometheus.Gauge
walOffsetGauge prometheus.Gauge
// Frequency to create new snapshots.
SnapshotInterval time.Duration
// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded.
// Database is snapshotted after interval, if needed, and older WAL files are discarded.
Retention time.Duration
// Time between checks for retention.
@@ -402,9 +405,10 @@ func (r *FileReplica) Start(ctx context.Context) {
ctx, r.cancel = context.WithCancel(ctx)
// Start goroutine to replicate data.
r.wg.Add(3)
r.wg.Add(4)
go func() { defer r.wg.Done(); r.monitor(ctx) }()
go func() { defer r.wg.Done(); r.retainer(ctx) }()
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
go func() { defer r.wg.Done(); r.validator(ctx) }()
}
@@ -446,7 +450,18 @@ func (r *FileReplica) monitor(ctx context.Context) {
// retainer runs in a separate goroutine and handles retention.
func (r *FileReplica) retainer(ctx context.Context) {
ticker := time.NewTicker(r.RetentionCheckInterval)
// Disable retention enforcement if retention period is non-positive.
if r.Retention <= 0 {
return
}
// Ensure check interval is not longer than retention period.
checkInterval := r.RetentionCheckInterval
if checkInterval > r.Retention {
checkInterval = r.Retention
}
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
@@ -462,6 +477,28 @@ func (r *FileReplica) retainer(ctx context.Context) {
}
}
// snapshotter runs in a separate goroutine and handles snapshotting.
func (r *FileReplica) snapshotter(ctx context.Context) {
if r.SnapshotInterval <= 0 {
return
}
ticker := time.NewTicker(r.SnapshotInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// validator runs in a separate goroutine and handles periodic validation.
func (r *FileReplica) validator(ctx context.Context) {
// Initialize counters since validation occurs infrequently.
@@ -531,6 +568,18 @@ func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos,
return pos, nil
}
// Snapshot copies the entire database to the replica path.
func (r *FileReplica) Snapshot(ctx context.Context) error {
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current db generation: %w", err)
} else if pos.IsZero() {
return ErrNoGeneration
}
return r.snapshot(ctx, pos.Generation, pos.Index)
}
// snapshot copies the entire database to the replica path.
func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error {
// Acquire a read lock on the database during snapshot to prevent checkpoints.
@@ -557,7 +606,7 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
return err
}
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
return nil
}

View File

@@ -80,6 +80,9 @@ type Replica struct {
// Time between syncs with the shadow WAL.
SyncInterval time.Duration
// Frequency to create new snapshots.
SnapshotInterval time.Duration
// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded.
Retention time.Duration
@@ -427,9 +430,10 @@ func (r *Replica) Start(ctx context.Context) {
ctx, r.cancel = context.WithCancel(ctx)
// Start goroutines to manage replica data.
r.wg.Add(3)
r.wg.Add(4)
go func() { defer r.wg.Done(); r.monitor(ctx) }()
go func() { defer r.wg.Done(); r.retainer(ctx) }()
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
go func() { defer r.wg.Done(); r.validator(ctx) }()
}
@@ -479,7 +483,18 @@ func (r *Replica) monitor(ctx context.Context) {
// retainer runs in a separate goroutine and handles retention.
func (r *Replica) retainer(ctx context.Context) {
ticker := time.NewTicker(r.RetentionCheckInterval)
// Disable retention enforcement if retention period is non-positive.
if r.Retention <= 0 {
return
}
// Ensure check interval is not longer than retention period.
checkInterval := r.RetentionCheckInterval
if checkInterval > r.Retention {
checkInterval = r.Retention
}
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
@@ -495,6 +510,28 @@ func (r *Replica) retainer(ctx context.Context) {
}
}
// snapshotter runs in a separate goroutine and handles snapshotting.
func (r *Replica) snapshotter(ctx context.Context) {
if r.SnapshotInterval <= 0 {
return
}
ticker := time.NewTicker(r.SnapshotInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.Snapshot(ctx); err != nil && err != litestream.ErrNoGeneration {
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// validator runs in a separate goroutine and handles periodic validation.
func (r *Replica) validator(ctx context.Context) {
// Initialize counters since validation occurs infrequently.
@@ -572,6 +609,18 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea
return pos, nil
}
// Snapshot copies the entire database to the replica path.
func (r *Replica) Snapshot(ctx context.Context) error {
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current db generation: %w", err)
} else if pos.IsZero() {
return litestream.ErrNoGeneration
}
return r.snapshot(ctx, pos.Generation, pos.Index)
}
// snapshot copies the entire database to the replica path.
func (r *Replica) snapshot(ctx context.Context, generation string, index int) error {
// Acquire a read lock on the database during snapshot to prevent checkpoints.
@@ -620,7 +669,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size()))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
return nil
}