Merge pull request #84 from benbjohnson/snapshot-interval

This commit is contained in:
Ben Johnson
2021-02-25 15:41:58 -07:00
committed by GitHub
4 changed files with 113 additions and 7 deletions

View File

@@ -243,6 +243,7 @@ type ReplicaConfig struct {
Retention time.Duration `yaml:"retention"` Retention time.Duration `yaml:"retention"`
RetentionCheckInterval time.Duration `yaml:"retention-check-interval"` RetentionCheckInterval time.Duration `yaml:"retention-check-interval"`
SyncInterval time.Duration `yaml:"sync-interval"` // s3 only SyncInterval time.Duration `yaml:"sync-interval"` // s3 only
SnapshotInterval time.Duration `yaml:"snapshot-interval"`
ValidationInterval time.Duration `yaml:"validation-interval"` ValidationInterval time.Duration `yaml:"validation-interval"`
// S3 settings // S3 settings
@@ -304,6 +305,9 @@ func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestrea
if v := c.RetentionCheckInterval; v > 0 { if v := c.RetentionCheckInterval; v > 0 {
r.RetentionCheckInterval = v r.RetentionCheckInterval = v
} }
if v := c.SnapshotInterval; v > 0 {
r.SnapshotInterval = v
}
if v := c.ValidationInterval; v > 0 { if v := c.ValidationInterval; v > 0 {
r.ValidationInterval = v r.ValidationInterval = v
} }
@@ -372,6 +376,9 @@ func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica,
if v := c.SyncInterval; v > 0 { if v := c.SyncInterval; v > 0 {
r.SyncInterval = v r.SyncInterval = v
} }
if v := c.SnapshotInterval; v > 0 {
r.SnapshotInterval = v
}
if v := c.ValidationInterval; v > 0 { if v := c.ValidationInterval; v > 0 {
r.ValidationInterval = v r.ValidationInterval = v
} }

View File

@@ -36,6 +36,7 @@ const (
// Litestream errors. // Litestream errors.
var ( var (
ErrNoGeneration = errors.New("no generation available")
ErrNoSnapshots = errors.New("no snapshots available") ErrNoSnapshots = errors.New("no snapshots available")
ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
) )

View File

@@ -98,8 +98,11 @@ type FileReplica struct {
walIndexGauge prometheus.Gauge walIndexGauge prometheus.Gauge
walOffsetGauge prometheus.Gauge walOffsetGauge prometheus.Gauge
// Frequency to create new snapshots.
SnapshotInterval time.Duration
// Time to keep snapshots and related WAL files. // Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded. // Database is snapshotted after interval, if needed, and older WAL files are discarded.
Retention time.Duration Retention time.Duration
// Time between checks for retention. // Time between checks for retention.
@@ -402,9 +405,10 @@ func (r *FileReplica) Start(ctx context.Context) {
ctx, r.cancel = context.WithCancel(ctx) ctx, r.cancel = context.WithCancel(ctx)
// Start goroutine to replicate data. // Start goroutine to replicate data.
r.wg.Add(3) r.wg.Add(4)
go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.monitor(ctx) }()
go func() { defer r.wg.Done(); r.retainer(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }()
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
go func() { defer r.wg.Done(); r.validator(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }()
} }
@@ -446,7 +450,18 @@ func (r *FileReplica) monitor(ctx context.Context) {
// retainer runs in a separate goroutine and handles retention. // retainer runs in a separate goroutine and handles retention.
func (r *FileReplica) retainer(ctx context.Context) { func (r *FileReplica) retainer(ctx context.Context) {
ticker := time.NewTicker(r.RetentionCheckInterval) // Disable retention enforcement if retention period is non-positive.
if r.Retention <= 0 {
return
}
// Ensure check interval is not longer than retention period.
checkInterval := r.RetentionCheckInterval
if checkInterval > r.Retention {
checkInterval = r.Retention
}
ticker := time.NewTicker(checkInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@@ -462,6 +477,28 @@ func (r *FileReplica) retainer(ctx context.Context) {
} }
} }
// snapshotter runs in a separate goroutine and handles snapshotting.
func (r *FileReplica) snapshotter(ctx context.Context) {
if r.SnapshotInterval <= 0 {
return
}
ticker := time.NewTicker(r.SnapshotInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// validator runs in a separate goroutine and handles periodic validation. // validator runs in a separate goroutine and handles periodic validation.
func (r *FileReplica) validator(ctx context.Context) { func (r *FileReplica) validator(ctx context.Context) {
// Initialize counters since validation occurs infrequently. // Initialize counters since validation occurs infrequently.
@@ -531,6 +568,18 @@ func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos,
return pos, nil return pos, nil
} }
// Snapshot copies the entire database to the replica path.
func (r *FileReplica) Snapshot(ctx context.Context) error {
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current db generation: %w", err)
} else if pos.IsZero() {
return ErrNoGeneration
}
return r.snapshot(ctx, pos.Generation, pos.Index)
}
// snapshot copies the entire database to the replica path. // snapshot copies the entire database to the replica path.
func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error { func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error {
// Acquire a read lock on the database during snapshot to prevent checkpoints. // Acquire a read lock on the database during snapshot to prevent checkpoints.
@@ -557,7 +606,7 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
return err return err
} }
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
return nil return nil
} }

View File

@@ -80,6 +80,9 @@ type Replica struct {
// Time between syncs with the shadow WAL. // Time between syncs with the shadow WAL.
SyncInterval time.Duration SyncInterval time.Duration
// Frequency to create new snapshots.
SnapshotInterval time.Duration
// Time to keep snapshots and related WAL files. // Time to keep snapshots and related WAL files.
// Database is snapshotted after interval and older WAL files are discarded. // Database is snapshotted after interval and older WAL files are discarded.
Retention time.Duration Retention time.Duration
@@ -427,9 +430,10 @@ func (r *Replica) Start(ctx context.Context) {
ctx, r.cancel = context.WithCancel(ctx) ctx, r.cancel = context.WithCancel(ctx)
// Start goroutines to manage replica data. // Start goroutines to manage replica data.
r.wg.Add(3) r.wg.Add(4)
go func() { defer r.wg.Done(); r.monitor(ctx) }() go func() { defer r.wg.Done(); r.monitor(ctx) }()
go func() { defer r.wg.Done(); r.retainer(ctx) }() go func() { defer r.wg.Done(); r.retainer(ctx) }()
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
go func() { defer r.wg.Done(); r.validator(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }()
} }
@@ -479,7 +483,18 @@ func (r *Replica) monitor(ctx context.Context) {
// retainer runs in a separate goroutine and handles retention. // retainer runs in a separate goroutine and handles retention.
func (r *Replica) retainer(ctx context.Context) { func (r *Replica) retainer(ctx context.Context) {
ticker := time.NewTicker(r.RetentionCheckInterval) // Disable retention enforcement if retention period is non-positive.
if r.Retention <= 0 {
return
}
// Ensure check interval is not longer than retention period.
checkInterval := r.RetentionCheckInterval
if checkInterval > r.Retention {
checkInterval = r.Retention
}
ticker := time.NewTicker(checkInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@@ -495,6 +510,28 @@ func (r *Replica) retainer(ctx context.Context) {
} }
} }
// snapshotter runs in a separate goroutine and handles snapshotting.
func (r *Replica) snapshotter(ctx context.Context) {
if r.SnapshotInterval <= 0 {
return
}
ticker := time.NewTicker(r.SnapshotInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.Snapshot(ctx); err != nil && err != litestream.ErrNoGeneration {
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// validator runs in a separate goroutine and handles periodic validation. // validator runs in a separate goroutine and handles periodic validation.
func (r *Replica) validator(ctx context.Context) { func (r *Replica) validator(ctx context.Context) {
// Initialize counters since validation occurs infrequently. // Initialize counters since validation occurs infrequently.
@@ -572,6 +609,18 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea
return pos, nil return pos, nil
} }
// Snapshot copies the entire database to the replica path.
func (r *Replica) Snapshot(ctx context.Context) error {
// Find current position of database.
pos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current db generation: %w", err)
} else if pos.IsZero() {
return litestream.ErrNoGeneration
}
return r.snapshot(ctx, pos.Generation, pos.Index)
}
// snapshot copies the entire database to the replica path. // snapshot copies the entire database to the replica path.
func (r *Replica) snapshot(ctx context.Context, generation string, index int) error { func (r *Replica) snapshot(ctx context.Context, generation string, index int) error {
// Acquire a read lock on the database during snapshot to prevent checkpoints. // Acquire a read lock on the database during snapshot to prevent checkpoints.
@@ -620,7 +669,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
r.putOperationTotalCounter.Inc() r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size())) r.putOperationBytesCounter.Add(float64(fi.Size()))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
return nil return nil
} }