diff --git a/s3/s3.go b/s3/s3.go index 6379e28..5217e54 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -651,47 +651,48 @@ func (r *Replica) Sync(ctx context.Context) (err error) { return err } - // Ensure sync & retainer do not calculate position or snapshot at the same time. - if err := func() error { - r.snapshotMu.Lock() - defer r.snapshotMu.Unlock() + // Find current position of database. + dpos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current generation: %w", err) + } else if dpos.IsZero() { + return fmt.Errorf("no generation, waiting for data") + } + generation := dpos.Generation - // Find current position of database. - dpos, err := r.db.Pos() - if err != nil { - return fmt.Errorf("cannot determine current generation: %w", err) - } else if dpos.IsZero() { - return fmt.Errorf("no generation, waiting for data") - } - generation := dpos.Generation + // Calculate position if we don't have a previous position or if the generation changes. + // Ensure sync & retainer do not snapshot at the same time. + if lastPos := r.LastPos(); lastPos.IsZero() || lastPos.Generation != generation { + if err := func() error { + r.snapshotMu.Lock() + defer r.snapshotMu.Unlock() - // Create snapshot if no snapshots exist for generation. - if n, err := r.snapshotN(generation); err != nil { - return err - } else if n == 0 { - if err := r.snapshot(ctx, generation, dpos.Index); err != nil { + // Create snapshot if no snapshots exist for generation. + if n, err := r.snapshotN(generation); err != nil { return err + } else if n == 0 { + if err := r.snapshot(ctx, generation, dpos.Index); err != nil { + return err + } + r.snapshotTotalGauge.Set(1.0) + } else { + r.snapshotTotalGauge.Set(float64(n)) } - r.snapshotTotalGauge.Set(1.0) - } else { - r.snapshotTotalGauge.Set(float64(n)) - } - // Determine position, if necessary. - if r.LastPos().IsZero() { + // Determine position, if necessary. pos, err := r.CalcPos(ctx, generation) if err != nil { return fmt.Errorf("cannot determine replica position: %s", err) } r.mu.Lock() + defer r.mu.Unlock() r.pos = pos - r.mu.Unlock() - } - return nil - }(); err != nil { - return err + return nil + }(); err != nil { + return err + } } // Read all WAL files since the last position. @@ -724,7 +725,8 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { var buf bytes.Buffer gw := gzip.NewWriter(&buf) - if _, err := gw.Write(b); err != nil { + n, err := gw.Write(b) + if err != nil { return err } else if err := gw.Close(); err != nil { return err @@ -745,7 +747,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return err } r.putOperationTotalCounter.Inc() - r.putOperationBytesCounter.Add(float64(buf.Len())) // compressed bytes + r.putOperationBytesCounter.Add(float64(n)) // compressed bytes // Save last replicated position. r.mu.Lock()