From ad7bf7f974590dec9689e355cb0f82ba9479d384 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 31 Jan 2021 08:12:18 -0700 Subject: [PATCH] Reduce logging output Previously, there were excessive log messages for checkpoints and retention. These have been removed or combined into a single log message where appropriate. --- cmd/litestream/restore.go | 2 ++ db.go | 16 ++++++++----- replica.go | 48 +++++++++++++++++++++++++++------------ s3/s3.go | 15 ++++++------ 4 files changed, 53 insertions(+), 28 deletions(-) diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 8568471..70c3028 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -19,6 +19,8 @@ type RestoreCommand struct{} func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { var configPath string opt := litestream.NewRestoreOptions() + opt.Verbose = true + fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) registerConfigFlag(fs, &configPath) fs.StringVar(&opt.OutputPath, "o", "", "output path") diff --git a/db.go b/db.go index d1ecb4b..8277e5d 100644 --- a/db.go +++ b/db.go @@ -421,7 +421,7 @@ func (db *DB) init() (err error) { // If we have an existing shadow WAL, ensure the headers match. if err := db.verifyHeadersMatch(); err != nil { - log.Printf("%s: cannot determine last wal position, clearing generation (%s)", db.path, err) + log.Printf("%s: init: cannot determine last wal position, clearing generation (%s)", db.path, err) if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) { return fmt.Errorf("remove generation name: %w", err) } @@ -736,7 +736,7 @@ func (db *DB) Sync() (err error) { if info.generation, err = db.createGeneration(); err != nil { return fmt.Errorf("create generation: %w", err) } - log.Printf("%s: new generation %q, %s", db.path, info.generation, info.reason) + log.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason) // Clear shadow wal info. info.shadowWALPath = db.ShadowWALPath(info.generation, 0) @@ -1264,7 +1264,7 @@ func (db *DB) checkpoint(mode string) (err error) { if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { return err } - log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) + Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) // Reacquire the read lock immediately after the checkpoint. if err := db.acquireReadLock(); err != nil { @@ -1414,7 +1414,10 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error { return fmt.Errorf("cannot restore wal: %w", err) } } - logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index) + + if opt.Verbose { + logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index) + } } // Copy file to final location. @@ -1664,8 +1667,9 @@ type RestoreOptions struct { // Only equivalent log output for a regular restore. DryRun bool - // Logger used to print status to. - Logger *log.Logger + // Logging settings. + Logger *log.Logger + Verbose bool } // NewRestoreOptions returns a new instance of RestoreOptions with defaults. diff --git a/replica.go b/replica.go index 44b3033..1bcee27 100644 --- a/replica.go +++ b/replica.go @@ -417,7 +417,7 @@ func (r *FileReplica) Stop() { func (r *FileReplica) monitor(ctx context.Context) { // Clear old temporary files that my have been left from a crash. if err := removeTmpFiles(r.dst); err != nil { - log.Printf("%s(%s): cannot remove tmp files: %s", r.db.Path(), r.Name(), err) + log.Printf("%s(%s): monitor: cannot remove tmp files: %s", r.db.Path(), r.Name(), err) } // Continuously check for new data to replicate. @@ -437,7 +437,7 @@ func (r *FileReplica) monitor(ctx context.Context) { // Synchronize the shadow wal into the replication directory. if err := r.Sync(ctx); err != nil { - log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) + log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err) continue } } @@ -454,7 +454,7 @@ func (r *FileReplica) retainer(ctx context.Context) { return case <-ticker.C: if err := r.EnforceRetention(ctx); err != nil { - log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err) + log.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err) continue } } @@ -548,11 +548,16 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int return nil } + startTime := time.Now() + if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil { return err + } else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil { + return err } - return compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid) + log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) + return nil } // snapshotN returns the number of snapshots for a generation. @@ -796,7 +801,6 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) { // If no retained snapshots exist, create a new snapshot. if len(snapshots) == 0 { - log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name()) if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { return fmt.Errorf("cannot snapshot: %w", err) } @@ -814,7 +818,7 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) { // Delete generations if it has no snapshots being retained. if snapshot == nil { - log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation) + log.Printf("%s(%s): retainer: deleting generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation) if err := os.RemoveAll(r.GenerationDir(generation)); err != nil { return fmt.Errorf("cannot delete generation %q dir: %w", generation, err) } @@ -843,6 +847,7 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener return err } + var n int for _, fi := range fis { idx, _, err := ParseSnapshotPath(fi.Name()) if err != nil { @@ -851,10 +856,13 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener continue } - log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name()) if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { return err } + n++ + } + if n > 0 { + log.Printf("%s(%s): retainer: deleting snapshots before %s/%08x; n=%d", r.db.Path(), r.Name(), generation, index, n) } return nil @@ -871,6 +879,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation return err } + var n int for _, fi := range fis { idx, _, _, err := ParseWALPath(fi.Name()) if err != nil { @@ -879,10 +888,13 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation continue } - log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name()) if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { return err } + n++ + } + if n > 0 { + log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n) } return nil @@ -997,10 +1009,8 @@ func ValidateReplica(ctx context.Context, r Replica) error { if err != nil { return fmt.Errorf("cannot compute checksum: %w", err) } - log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos) // Wait until replica catches up to position. - log.Printf("%s(%s): waiting for replica", db.Path(), r.Name()) if err := waitForReplica(ctx, r, pos); err != nil { return fmt.Errorf("cannot wait for replica: %w", err) } @@ -1029,16 +1039,20 @@ func ValidateReplica(ctx context.Context, r Replica) error { return err } - log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1) + status := "ok" + mismatch := chksum0 != chksum1 + if mismatch { + status = "mismatch" + } + log.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos) // Validate checksums match. - if chksum0 != chksum1 { + if mismatch { internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc() return ErrChecksumMismatch } internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc() - log.Printf("%s(%s): replica ok", db.Path(), r.Name()) return nil } @@ -1050,6 +1064,9 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() + timer := time.NewTicker(10 * time.Second) + defer ticker.Stop() + once := make(chan struct{}, 1) once <- struct{}{} @@ -1057,6 +1074,8 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error { select { case <-ctx.Done(): return ctx.Err() + case <-timer.C: + return fmt.Errorf("replica wait exceeded timeout") case <-ticker.C: case <-once: // immediate on first check } @@ -1064,7 +1083,7 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error { // Obtain current position of replica, check if past target position. curr, err := r.CalcPos(ctx, pos.Generation) if err != nil { - log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err) + log.Printf("%s(%s): validator: cannot obtain replica position: %s", db.Path(), r.Name(), err) continue } @@ -1083,7 +1102,6 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error { // If not ready, restart loop. if !ready { - log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos) continue } diff --git a/s3/s3.go b/s3/s3.go index 58ba280..5897320 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -467,7 +467,7 @@ func (r *Replica) monitor(ctx context.Context) { // Synchronize the shadow wal into the replication directory. if err := r.Sync(ctx); err != nil { - log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) + log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err) continue } } @@ -603,6 +603,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er }() snapshotPath := r.SnapshotPath(generation, index) + startTime := time.Now() if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ Bucket: aws.String(r.Bucket), @@ -615,6 +616,8 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er r.putOperationTotalCounter.Inc() r.putOperationBytesCounter.Add(float64(fi.Size())) + log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime)) + return nil } @@ -935,7 +938,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) { // If no retained snapshots exist, create a new snapshot. if len(snapshots) == 0 { - log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name()) if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { return fmt.Errorf("cannot snapshot: %w", err) } @@ -958,7 +960,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) { // Delete generations if it has no snapshots being retained. if snapshot == nil { - log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation) if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil { return fmt.Errorf("cannot delete generation %q dir: %w", generation, err) } @@ -1001,16 +1002,13 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, } // Delete all files in batches. + var n int for i := 0; i < len(objIDs); i += MaxKeys { j := i + MaxKeys if j > len(objIDs) { j = len(objIDs) } - for _, objID := range objIDs[i:j] { - log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key)) - } - if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(r.Bucket), Delete: &s3.Delete{ @@ -1020,9 +1018,12 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, }); err != nil { return err } + n += len(objIDs[i:j]) r.deleteOperationTotalCounter.Inc() } + log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n) + return nil }