Reduce logging output
Previously, there were excessive log messages for checkpoints and retention. These have been removed or combined into a single log message where appropriate.
This commit is contained in:
48
replica.go
48
replica.go
@@ -417,7 +417,7 @@ func (r *FileReplica) Stop() {
|
||||
func (r *FileReplica) monitor(ctx context.Context) {
|
||||
// Clear old temporary files that my have been left from a crash.
|
||||
if err := removeTmpFiles(r.dst); err != nil {
|
||||
log.Printf("%s(%s): cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): monitor: cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
|
||||
}
|
||||
|
||||
// Continuously check for new data to replicate.
|
||||
@@ -437,7 +437,7 @@ func (r *FileReplica) monitor(ctx context.Context) {
|
||||
|
||||
// Synchronize the shadow wal into the replication directory.
|
||||
if err := r.Sync(ctx); err != nil {
|
||||
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -454,7 +454,7 @@ func (r *FileReplica) retainer(ctx context.Context) {
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := r.EnforceRetention(ctx); err != nil {
|
||||
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -548,11 +548,16 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
|
||||
return nil
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
|
||||
return err
|
||||
} else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid)
|
||||
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
|
||||
return nil
|
||||
}
|
||||
|
||||
// snapshotN returns the number of snapshots for a generation.
|
||||
@@ -796,7 +801,6 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
|
||||
|
||||
// If no retained snapshots exist, create a new snapshot.
|
||||
if len(snapshots) == 0 {
|
||||
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
|
||||
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
|
||||
return fmt.Errorf("cannot snapshot: %w", err)
|
||||
}
|
||||
@@ -814,7 +818,7 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
|
||||
|
||||
// Delete generations if it has no snapshots being retained.
|
||||
if snapshot == nil {
|
||||
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
|
||||
log.Printf("%s(%s): retainer: deleting generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
|
||||
if err := os.RemoveAll(r.GenerationDir(generation)); err != nil {
|
||||
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
|
||||
}
|
||||
@@ -843,6 +847,7 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
|
||||
return err
|
||||
}
|
||||
|
||||
var n int
|
||||
for _, fi := range fis {
|
||||
idx, _, err := ParseSnapshotPath(fi.Name())
|
||||
if err != nil {
|
||||
@@ -851,10 +856,13 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name())
|
||||
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
||||
return err
|
||||
}
|
||||
n++
|
||||
}
|
||||
if n > 0 {
|
||||
log.Printf("%s(%s): retainer: deleting snapshots before %s/%08x; n=%d", r.db.Path(), r.Name(), generation, index, n)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -871,6 +879,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
|
||||
return err
|
||||
}
|
||||
|
||||
var n int
|
||||
for _, fi := range fis {
|
||||
idx, _, _, err := ParseWALPath(fi.Name())
|
||||
if err != nil {
|
||||
@@ -879,10 +888,13 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
|
||||
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
||||
return err
|
||||
}
|
||||
n++
|
||||
}
|
||||
if n > 0 {
|
||||
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -997,10 +1009,8 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||
}
|
||||
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos)
|
||||
|
||||
// Wait until replica catches up to position.
|
||||
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
|
||||
if err := waitForReplica(ctx, r, pos); err != nil {
|
||||
return fmt.Errorf("cannot wait for replica: %w", err)
|
||||
}
|
||||
@@ -1029,16 +1039,20 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1)
|
||||
status := "ok"
|
||||
mismatch := chksum0 != chksum1
|
||||
if mismatch {
|
||||
status = "mismatch"
|
||||
}
|
||||
log.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
|
||||
|
||||
// Validate checksums match.
|
||||
if chksum0 != chksum1 {
|
||||
if mismatch {
|
||||
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
|
||||
return ErrChecksumMismatch
|
||||
}
|
||||
|
||||
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
|
||||
log.Printf("%s(%s): replica ok", db.Path(), r.Name())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1050,6 +1064,9 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
timer := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
once := make(chan struct{}, 1)
|
||||
once <- struct{}{}
|
||||
|
||||
@@ -1057,6 +1074,8 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
return fmt.Errorf("replica wait exceeded timeout")
|
||||
case <-ticker.C:
|
||||
case <-once: // immediate on first check
|
||||
}
|
||||
@@ -1064,7 +1083,7 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
// Obtain current position of replica, check if past target position.
|
||||
curr, err := r.CalcPos(ctx, pos.Generation)
|
||||
if err != nil {
|
||||
log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): validator: cannot obtain replica position: %s", db.Path(), r.Name(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1083,7 +1102,6 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
|
||||
// If not ready, restart loop.
|
||||
if !ready {
|
||||
log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user