Compare commits

..

2 Commits

Author SHA1 Message Date
Ben Johnson
393317b6f8 Fix FindMinSnapshotByGeneration() loop ref bug 2021-12-05 09:42:49 -07:00
Ben Johnson
1e6878998c Reduce snapshot check frequency
Previously, a bug was introduced that added a `LIST` operation
on every replica sync which significantly increased the cost of
running Litestream against S3. This changes the behavior to only
issue the `LIST` operation when the generation has changed.
2021-10-12 09:47:29 -06:00
3 changed files with 28 additions and 24 deletions

View File

@@ -207,11 +207,13 @@ func FilterSnapshotsAfter(a []SnapshotInfo, t time.Time) []SnapshotInfo {
// FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
func FindMinSnapshotByGeneration(a []SnapshotInfo, generation string) *SnapshotInfo {
var min *SnapshotInfo
for _, snapshot := range a {
for i := range a {
snapshot := &a[i]
if snapshot.Generation != generation {
continue
} else if min == nil || snapshot.Index < min.Index {
min = &snapshot
min = snapshot
}
}
return min

View File

@@ -128,6 +128,16 @@ func TestWALSegmentPath(t *testing.T) {
})
}
func TestFindMinSnapshotByGeneration(t *testing.T) {
infos := []litestream.SnapshotInfo{
{Generation: "29cf4bced74e92ab", Index: 0},
{Generation: "5dfeb4aa03232553", Index: 24},
}
if got, want := litestream.FindMinSnapshotByGeneration(infos, "29cf4bced74e92ab"), &infos[0]; got != want {
t.Fatalf("info=%#v, want %#v", got, want)
}
}
func MustDecodeHexString(s string) []byte {
b, err := hex.DecodeString(s)
if err != nil {

View File

@@ -159,22 +159,21 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
// Create snapshot if no snapshots exist for generation.
snapshotN, err := r.snapshotN(generation)
if err != nil {
return err
} else if snapshotN == 0 {
if info, err := r.Snapshot(ctx); err != nil {
return err
} else if info.Generation != generation {
return fmt.Errorf("generation changed during snapshot, exiting sync")
}
snapshotN = 1
}
replicaSnapshotTotalGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(snapshotN))
// Determine position, if necessary.
// Create a new snapshot and update the current replica position if
// the generation on the database has changed.
if r.Pos().Generation != generation {
// Create snapshot if no snapshots exist for generation.
snapshotN, err := r.snapshotN(generation)
if err != nil {
return err
} else if snapshotN == 0 {
if info, err := r.Snapshot(ctx); err != nil {
return err
} else if info.Generation != generation {
return fmt.Errorf("generation changed during snapshot, exiting sync")
}
}
pos, err := r.calcPos(ctx, generation)
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", err)
@@ -1307,13 +1306,6 @@ func (r *Replica) downloadWAL(ctx context.Context, generation string, index int,
// Replica metrics.
var (
replicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "replica",
Name: "snapshot_total",
Help: "The current number of snapshots",
}, []string{"db", "name"})
replicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "replica",