Merge pull request #31 from benbjohnson/adjust-logging

Reduce logging output
This commit is contained in:
Ben Johnson
2021-01-31 08:14:57 -07:00
committed by GitHub
4 changed files with 53 additions and 28 deletions

View File

@@ -19,6 +19,8 @@ type RestoreCommand struct{}
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string var configPath string
opt := litestream.NewRestoreOptions() opt := litestream.NewRestoreOptions()
opt.Verbose = true
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
registerConfigFlag(fs, &configPath) registerConfigFlag(fs, &configPath)
fs.StringVar(&opt.OutputPath, "o", "", "output path") fs.StringVar(&opt.OutputPath, "o", "", "output path")

16
db.go
View File

@@ -421,7 +421,7 @@ func (db *DB) init() (err error) {
// If we have an existing shadow WAL, ensure the headers match. // If we have an existing shadow WAL, ensure the headers match.
if err := db.verifyHeadersMatch(); err != nil { if err := db.verifyHeadersMatch(); err != nil {
log.Printf("%s: cannot determine last wal position, clearing generation (%s)", db.path, err) log.Printf("%s: init: cannot determine last wal position, clearing generation (%s)", db.path, err)
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) { if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove generation name: %w", err) return fmt.Errorf("remove generation name: %w", err)
} }
@@ -736,7 +736,7 @@ func (db *DB) Sync() (err error) {
if info.generation, err = db.createGeneration(); err != nil { if info.generation, err = db.createGeneration(); err != nil {
return fmt.Errorf("create generation: %w", err) return fmt.Errorf("create generation: %w", err)
} }
log.Printf("%s: new generation %q, %s", db.path, info.generation, info.reason) log.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
// Clear shadow wal info. // Clear shadow wal info.
info.shadowWALPath = db.ShadowWALPath(info.generation, 0) info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
@@ -1264,7 +1264,7 @@ func (db *DB) checkpoint(mode string) (err error) {
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
return err return err
} }
log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
// Reacquire the read lock immediately after the checkpoint. // Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil { if err := db.acquireReadLock(); err != nil {
@@ -1414,7 +1414,10 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error {
return fmt.Errorf("cannot restore wal: %w", err) return fmt.Errorf("cannot restore wal: %w", err)
} }
} }
logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
if opt.Verbose {
logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
}
} }
// Copy file to final location. // Copy file to final location.
@@ -1664,8 +1667,9 @@ type RestoreOptions struct {
// Only equivalent log output for a regular restore. // Only equivalent log output for a regular restore.
DryRun bool DryRun bool
// Logger used to print status to. // Logging settings.
Logger *log.Logger Logger *log.Logger
Verbose bool
} }
// NewRestoreOptions returns a new instance of RestoreOptions with defaults. // NewRestoreOptions returns a new instance of RestoreOptions with defaults.

View File

@@ -417,7 +417,7 @@ func (r *FileReplica) Stop() {
func (r *FileReplica) monitor(ctx context.Context) { func (r *FileReplica) monitor(ctx context.Context) {
// Clear old temporary files that my have been left from a crash. // Clear old temporary files that my have been left from a crash.
if err := removeTmpFiles(r.dst); err != nil { if err := removeTmpFiles(r.dst); err != nil {
log.Printf("%s(%s): cannot remove tmp files: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): monitor: cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
} }
// Continuously check for new data to replicate. // Continuously check for new data to replicate.
@@ -437,7 +437,7 @@ func (r *FileReplica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory. // Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil { if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
continue continue
} }
} }
@@ -454,7 +454,7 @@ func (r *FileReplica) retainer(ctx context.Context) {
return return
case <-ticker.C: case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil { if err := r.EnforceRetention(ctx); err != nil {
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
continue continue
} }
} }
@@ -548,11 +548,16 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
return nil return nil
} }
startTime := time.Now()
if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil { if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
return err return err
} else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil {
return err
} }
return compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid) log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
return nil
} }
// snapshotN returns the number of snapshots for a generation. // snapshotN returns the number of snapshots for a generation.
@@ -796,7 +801,6 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
// If no retained snapshots exist, create a new snapshot. // If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 { if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err) return fmt.Errorf("cannot snapshot: %w", err)
} }
@@ -814,7 +818,7 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
// Delete generations if it has no snapshots being retained. // Delete generations if it has no snapshots being retained.
if snapshot == nil { if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation) log.Printf("%s(%s): retainer: deleting generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := os.RemoveAll(r.GenerationDir(generation)); err != nil { if err := os.RemoveAll(r.GenerationDir(generation)); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err) return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
} }
@@ -843,6 +847,7 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
return err return err
} }
var n int
for _, fi := range fis { for _, fi := range fis {
idx, _, err := ParseSnapshotPath(fi.Name()) idx, _, err := ParseSnapshotPath(fi.Name())
if err != nil { if err != nil {
@@ -851,10 +856,13 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
continue continue
} }
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err return err
} }
n++
}
if n > 0 {
log.Printf("%s(%s): retainer: deleting snapshots before %s/%08x; n=%d", r.db.Path(), r.Name(), generation, index, n)
} }
return nil return nil
@@ -871,6 +879,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
return err return err
} }
var n int
for _, fi := range fis { for _, fi := range fis {
idx, _, _, err := ParseWALPath(fi.Name()) idx, _, _, err := ParseWALPath(fi.Name())
if err != nil { if err != nil {
@@ -879,10 +888,13 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
continue continue
} }
log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err return err
} }
n++
}
if n > 0 {
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
} }
return nil return nil
@@ -997,10 +1009,8 @@ func ValidateReplica(ctx context.Context, r Replica) error {
if err != nil { if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err) return fmt.Errorf("cannot compute checksum: %w", err)
} }
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos)
// Wait until replica catches up to position. // Wait until replica catches up to position.
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
if err := waitForReplica(ctx, r, pos); err != nil { if err := waitForReplica(ctx, r, pos); err != nil {
return fmt.Errorf("cannot wait for replica: %w", err) return fmt.Errorf("cannot wait for replica: %w", err)
} }
@@ -1029,16 +1039,20 @@ func ValidateReplica(ctx context.Context, r Replica) error {
return err return err
} }
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1) status := "ok"
mismatch := chksum0 != chksum1
if mismatch {
status = "mismatch"
}
log.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
// Validate checksums match. // Validate checksums match.
if chksum0 != chksum1 { if mismatch {
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc() internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
return ErrChecksumMismatch return ErrChecksumMismatch
} }
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc() internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
log.Printf("%s(%s): replica ok", db.Path(), r.Name())
return nil return nil
} }
@@ -1050,6 +1064,9 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
timer := time.NewTicker(10 * time.Second)
defer ticker.Stop()
once := make(chan struct{}, 1) once := make(chan struct{}, 1)
once <- struct{}{} once <- struct{}{}
@@ -1057,6 +1074,8 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-timer.C:
return fmt.Errorf("replica wait exceeded timeout")
case <-ticker.C: case <-ticker.C:
case <-once: // immediate on first check case <-once: // immediate on first check
} }
@@ -1064,7 +1083,7 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
// Obtain current position of replica, check if past target position. // Obtain current position of replica, check if past target position.
curr, err := r.CalcPos(ctx, pos.Generation) curr, err := r.CalcPos(ctx, pos.Generation)
if err != nil { if err != nil {
log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err) log.Printf("%s(%s): validator: cannot obtain replica position: %s", db.Path(), r.Name(), err)
continue continue
} }
@@ -1083,7 +1102,6 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
// If not ready, restart loop. // If not ready, restart loop.
if !ready { if !ready {
log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos)
continue continue
} }

View File

@@ -467,7 +467,7 @@ func (r *Replica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory. // Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil { if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
continue continue
} }
} }
@@ -603,6 +603,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
}() }()
snapshotPath := r.SnapshotPath(generation, index) snapshotPath := r.SnapshotPath(generation, index)
startTime := time.Now()
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
@@ -615,6 +616,8 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
r.putOperationTotalCounter.Inc() r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size())) r.putOperationBytesCounter.Add(float64(fi.Size()))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
return nil return nil
} }
@@ -935,7 +938,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
// If no retained snapshots exist, create a new snapshot. // If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 { if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err) return fmt.Errorf("cannot snapshot: %w", err)
} }
@@ -958,7 +960,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
// Delete generations if it has no snapshots being retained. // Delete generations if it has no snapshots being retained.
if snapshot == nil { if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil { if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err) return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
} }
@@ -1001,16 +1002,13 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
} }
// Delete all files in batches. // Delete all files in batches.
var n int
for i := 0; i < len(objIDs); i += MaxKeys { for i := 0; i < len(objIDs); i += MaxKeys {
j := i + MaxKeys j := i + MaxKeys
if j > len(objIDs) { if j > len(objIDs) {
j = len(objIDs) j = len(objIDs)
} }
for _, objID := range objIDs[i:j] {
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
}
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Delete: &s3.Delete{ Delete: &s3.Delete{
@@ -1020,9 +1018,12 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
}); err != nil { }); err != nil {
return err return err
} }
n += len(objIDs[i:j])
r.deleteOperationTotalCounter.Inc() r.deleteOperationTotalCounter.Inc()
} }
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
return nil return nil
} }