Compare commits
2 Commits
v0.3.6-alp
...
v0.3.7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
393317b6f8 | ||
|
|
1e6878998c |
@@ -207,11 +207,13 @@ func FilterSnapshotsAfter(a []SnapshotInfo, t time.Time) []SnapshotInfo {
|
|||||||
// FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
|
// FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
|
||||||
func FindMinSnapshotByGeneration(a []SnapshotInfo, generation string) *SnapshotInfo {
|
func FindMinSnapshotByGeneration(a []SnapshotInfo, generation string) *SnapshotInfo {
|
||||||
var min *SnapshotInfo
|
var min *SnapshotInfo
|
||||||
for _, snapshot := range a {
|
for i := range a {
|
||||||
|
snapshot := &a[i]
|
||||||
|
|
||||||
if snapshot.Generation != generation {
|
if snapshot.Generation != generation {
|
||||||
continue
|
continue
|
||||||
} else if min == nil || snapshot.Index < min.Index {
|
} else if min == nil || snapshot.Index < min.Index {
|
||||||
min = &snapshot
|
min = snapshot
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return min
|
return min
|
||||||
|
|||||||
@@ -128,6 +128,16 @@ func TestWALSegmentPath(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFindMinSnapshotByGeneration(t *testing.T) {
|
||||||
|
infos := []litestream.SnapshotInfo{
|
||||||
|
{Generation: "29cf4bced74e92ab", Index: 0},
|
||||||
|
{Generation: "5dfeb4aa03232553", Index: 24},
|
||||||
|
}
|
||||||
|
if got, want := litestream.FindMinSnapshotByGeneration(infos, "29cf4bced74e92ab"), &infos[0]; got != want {
|
||||||
|
t.Fatalf("info=%#v, want %#v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func MustDecodeHexString(s string) []byte {
|
func MustDecodeHexString(s string) []byte {
|
||||||
b, err := hex.DecodeString(s)
|
b, err := hex.DecodeString(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
36
replica.go
36
replica.go
@@ -159,22 +159,21 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
|
|||||||
|
|
||||||
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
|
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
|
||||||
|
|
||||||
// Create snapshot if no snapshots exist for generation.
|
// Create a new snapshot and update the current replica position if
|
||||||
snapshotN, err := r.snapshotN(generation)
|
// the generation on the database has changed.
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
} else if snapshotN == 0 {
|
|
||||||
if info, err := r.Snapshot(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
} else if info.Generation != generation {
|
|
||||||
return fmt.Errorf("generation changed during snapshot, exiting sync")
|
|
||||||
}
|
|
||||||
snapshotN = 1
|
|
||||||
}
|
|
||||||
replicaSnapshotTotalGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(snapshotN))
|
|
||||||
|
|
||||||
// Determine position, if necessary.
|
|
||||||
if r.Pos().Generation != generation {
|
if r.Pos().Generation != generation {
|
||||||
|
// Create snapshot if no snapshots exist for generation.
|
||||||
|
snapshotN, err := r.snapshotN(generation)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if snapshotN == 0 {
|
||||||
|
if info, err := r.Snapshot(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
} else if info.Generation != generation {
|
||||||
|
return fmt.Errorf("generation changed during snapshot, exiting sync")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pos, err := r.calcPos(ctx, generation)
|
pos, err := r.calcPos(ctx, generation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot determine replica position: %s", err)
|
return fmt.Errorf("cannot determine replica position: %s", err)
|
||||||
@@ -1307,13 +1306,6 @@ func (r *Replica) downloadWAL(ctx context.Context, generation string, index int,
|
|||||||
|
|
||||||
// Replica metrics.
|
// Replica metrics.
|
||||||
var (
|
var (
|
||||||
replicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
||||||
Namespace: "litestream",
|
|
||||||
Subsystem: "replica",
|
|
||||||
Name: "snapshot_total",
|
|
||||||
Help: "The current number of snapshots",
|
|
||||||
}, []string{"db", "name"})
|
|
||||||
|
|
||||||
replicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
replicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: "litestream",
|
Namespace: "litestream",
|
||||||
Subsystem: "replica",
|
Subsystem: "replica",
|
||||||
|
|||||||
Reference in New Issue
Block a user