Sync replica snapshots to previous (#480)
This commit is contained in:
25
replica.go
25
replica.go
@@ -733,6 +733,31 @@ func (r *Replica) snapshotter(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := r.Logger()
|
||||||
|
if pos, err := r.db.Pos(); err != nil {
|
||||||
|
logger.Error("snapshotter cannot determine generation", "error", err)
|
||||||
|
} else if !pos.IsZero() {
|
||||||
|
if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil {
|
||||||
|
logger.Error("snapshotter cannot determine latest snapshot", "error", err)
|
||||||
|
} else if snapshot != nil {
|
||||||
|
nextSnapshot := r.SnapshotInterval - time.Since(snapshot.CreatedAt)
|
||||||
|
if nextSnapshot < 0 {
|
||||||
|
nextSnapshot = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("snapshot interval adjusted", "previous", snapshot.CreatedAt.Format(time.RFC3339), "next", nextSnapshot.String())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(nextSnapshot):
|
||||||
|
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
|
||||||
|
logger.Error("snapshotter error", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(r.SnapshotInterval)
|
ticker := time.NewTicker(r.SnapshotInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user