Prevent deadlocks with replicas (#524)
This commit is contained in:
12
replica.go
12
replica.go
@@ -176,7 +176,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
|
||||
// the generation on the database has changed.
|
||||
if r.Pos().Generation != generation {
|
||||
// Create snapshot if no snapshots exist for generation.
|
||||
snapshotN, err := r.snapshotN(generation)
|
||||
snapshotN, err := r.snapshotN(ctx, generation)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if snapshotN == 0 {
|
||||
@@ -237,6 +237,12 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
||||
var g errgroup.Group
|
||||
g.Go(func() error {
|
||||
_, err := r.Client.WriteWALSegment(ctx, pos, pr)
|
||||
|
||||
// Always close pipe reader to signal writers.
|
||||
if e := pr.CloseWithError(err); err == nil {
|
||||
return e
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
@@ -331,8 +337,8 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// snapshotN returns the number of snapshots for a generation.
|
||||
func (r *Replica) snapshotN(generation string) (int, error) {
|
||||
itr, err := r.Client.Snapshots(context.Background(), generation)
|
||||
func (r *Replica) snapshotN(ctx context.Context, generation string) (int, error) {
|
||||
itr, err := r.Client.Snapshots(ctx, generation)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user