Ignore WAL indices before snapshot that is being restored (#527)
This commit is contained in:
16
replica.go
16
replica.go
@@ -961,6 +961,7 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
|
|||||||
}
|
}
|
||||||
defer sitr.Close()
|
defer sitr.Close()
|
||||||
|
|
||||||
|
minIndex, maxIndex := -1, -1
|
||||||
for sitr.Next() {
|
for sitr.Next() {
|
||||||
info := sitr.Snapshot()
|
info := sitr.Snapshot()
|
||||||
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
|
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
|
||||||
@@ -969,6 +970,12 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
|
|||||||
if updatedAt.IsZero() || info.CreatedAt.After(updatedAt) {
|
if updatedAt.IsZero() || info.CreatedAt.After(updatedAt) {
|
||||||
updatedAt = info.CreatedAt
|
updatedAt = info.CreatedAt
|
||||||
}
|
}
|
||||||
|
if minIndex == -1 || info.Index < minIndex {
|
||||||
|
minIndex = info.Index
|
||||||
|
}
|
||||||
|
if info.Index > maxIndex {
|
||||||
|
maxIndex = info.Index
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := sitr.Close(); err != nil {
|
if err := sitr.Close(); err != nil {
|
||||||
return createdAt, updatedAt, err
|
return createdAt, updatedAt, err
|
||||||
@@ -983,6 +990,9 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
|
|||||||
|
|
||||||
for witr.Next() {
|
for witr.Next() {
|
||||||
info := witr.WALSegment()
|
info := witr.WALSegment()
|
||||||
|
if info.Index < minIndex || info.Index > maxIndex {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
|
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
|
||||||
createdAt = info.CreatedAt
|
createdAt = info.CreatedAt
|
||||||
}
|
}
|
||||||
@@ -1077,7 +1087,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Compute list of offsets for each WAL index.
|
// Compute list of offsets for each WAL index.
|
||||||
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, opt.Index, opt.Timestamp)
|
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, minWALIndex, opt.Index, opt.Timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot find max wal index for restore: %w", err)
|
return fmt.Errorf("cannot find max wal index for restore: %w", err)
|
||||||
}
|
}
|
||||||
@@ -1285,7 +1295,7 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i
|
|||||||
|
|
||||||
// walSegmentMap returns a map of WAL indices to their segments.
|
// walSegmentMap returns a map of WAL indices to their segments.
|
||||||
// Filters by a max timestamp or a max index.
|
// Filters by a max timestamp or a max index.
|
||||||
func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
|
func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
|
||||||
itr, err := r.Client.WALSegments(ctx, generation)
|
itr, err := r.Client.WALSegments(ctx, generation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -1301,6 +1311,8 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex
|
|||||||
break // after max timestamp, skip
|
break // after max timestamp, skip
|
||||||
} else if info.Index > maxIndex {
|
} else if info.Index > maxIndex {
|
||||||
break // after max index, skip
|
break // after max index, skip
|
||||||
|
} else if info.Index < minIndex {
|
||||||
|
continue // before min index, continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify offsets are added in order.
|
// Verify offsets are added in order.
|
||||||
|
|||||||
Reference in New Issue
Block a user