Use structured logging with slog (#475)

This commit is contained in:
Toni Spets
2023-10-17 00:05:22 +03:00
committed by GitHub
parent fd892eef6d
commit b1abd6bd99
18 changed files with 132 additions and 123 deletions

View File

@@ -7,7 +7,7 @@ import (
"hash/crc64"
"io"
"io/ioutil"
"log"
"log/slog"
"math"
"os"
"path/filepath"
@@ -72,9 +72,6 @@ type Replica struct {
// Encryption identities and recipients
AgeIdentities []age.Identity
AgeRecipients []age.Recipient
// The logger to send logging messages to. Defaults to log.Default()
Logger *log.Logger
}
func NewReplica(db *DB, name string) *Replica {
@@ -87,7 +84,6 @@ func NewReplica(db *DB, name string) *Replica {
Retention: DefaultRetention,
RetentionCheckInterval: DefaultRetentionCheckInterval,
MonitorEnabled: true,
Logger: log.Default(),
}
return r
@@ -101,6 +97,11 @@ func (r *Replica) Name() string {
return r.name
}
// Logger returns the DB sub-logger for this replica.
func (r *Replica) Logger() *slog.Logger {
return r.db.Logger.With("replica", r.Name())
}
// DB returns a reference to the database the replica is attached to, if any.
func (r *Replica) DB() *DB { return r.db }
@@ -166,7 +167,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
}
generation := dpos.Generation
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
r.Logger().Debug("replica sync", "position", dpos.String())
// Create a new snapshot and update the current replica position if
// the generation on the database has changed.
@@ -188,7 +189,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
return fmt.Errorf("cannot determine replica position: %s", err)
}
Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos)
r.Logger().Debug("replica sync: calc new pos", "position", pos.String())
r.mu.Lock()
r.pos = pos
r.mu.Unlock()
@@ -222,6 +223,12 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
// Obtain initial position from shadow reader.
// It may have moved to the next index if previous position was at the end.
pos := rd.Pos()
initialPos := pos
startTime := time.Now()
var bytesWritten int
logger := r.Logger()
logger.Info("write wal segment", "position", initialPos.String())
// Copy through pipe into client from the starting position.
var g errgroup.Group
@@ -263,6 +270,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}
// Copy frames.
@@ -289,6 +297,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}
// Flush LZ4 writer, encryption writer and close pipe.
@@ -314,6 +323,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index))
replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset))
logger.Info("wal segment written", "position", initialPos.String(), "elapsed", time.Since(startTime).String(), "sz", bytesWritten)
return nil
}
@@ -535,6 +545,10 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return wc.Close()
})
logger := r.Logger()
logger.Info("write snapshot", "position", pos.String())
startTime := time.Now()
// Delegate write to client & wait for writer goroutine to finish.
if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil {
return info, err
@@ -542,8 +556,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return info, err
}
r.Logger.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
logger.Info("snapshot written", "position", pos.String(), "elapsed", time.Since(startTime).String(), "sz", info.Size)
return info, nil
}
@@ -610,7 +623,7 @@ func (r *Replica) deleteSnapshotsBeforeIndex(ctx context.Context, generation str
if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil {
return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err)
}
r.Logger.Printf("%s(%s): snapshot deleted %s/%08x", r.db.Path(), r.Name(), generation, index)
r.Logger().Info("snapshot deleted", "generation", generation, "index", index)
}
return itr.Close()
@@ -642,8 +655,8 @@ func (r *Replica) deleteWALSegmentsBeforeIndex(ctx context.Context, generation s
if err := r.Client.DeleteWALSegments(ctx, a); err != nil {
return fmt.Errorf("delete wal segments: %w", err)
}
r.Logger.Printf("%s(%s): wal segmented deleted before %s/%08x: n=%d", r.db.Path(), r.Name(), generation, index, len(a))
r.Logger().Info("wal segmented deleted before", "generation", generation, "index", index, "n", len(a))
return nil
}
@@ -679,7 +692,7 @@ func (r *Replica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil {
r.Logger.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("monitor error", "error", err)
continue
}
}
@@ -707,7 +720,7 @@ func (r *Replica) retainer(ctx context.Context) {
return
case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil {
r.Logger.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("retainer error", "error", err)
continue
}
}
@@ -729,7 +742,7 @@ func (r *Replica) snapshotter(ctx context.Context) {
return
case <-ticker.C:
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
r.Logger.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("snapshotter error", "error", err)
continue
}
}
@@ -757,7 +770,7 @@ func (r *Replica) validator(ctx context.Context) {
return
case <-ticker.C:
if err := r.Validate(ctx); err != nil {
r.Logger.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
r.Logger().Error("validation error", "error", err)
continue
}
}
@@ -794,7 +807,6 @@ func (r *Replica) Validate(ctx context.Context) error {
ReplicaName: r.Name(),
Generation: pos.Generation,
Index: pos.Index - 1,
Logger: r.Logger,
}); err != nil {
return fmt.Errorf("cannot restore: %w", err)
}
@@ -819,7 +831,7 @@ func (r *Replica) Validate(ctx context.Context) error {
if mismatch {
status = "mismatch"
}
r.Logger.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
r.Logger().Info("validator", "status", status, "db", fmt.Sprintf("%016x", chksum0), "replica", fmt.Sprintf("%016x", chksum1), "position", pos.String())
// Validate checksums match.
if mismatch {
@@ -837,8 +849,6 @@ func (r *Replica) Validate(ctx context.Context) error {
// waitForReplica blocks until replica reaches at least the given position.
func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
db := r.DB()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
@@ -861,7 +871,7 @@ func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
// Obtain current position of replica, check if past target position.
curr := r.Pos()
if curr.IsZero() {
r.Logger.Printf("%s(%s): validator: no replica position available", db.Path(), r.Name())
r.Logger().Info("validator: no replica position available")
continue
}
@@ -1013,17 +1023,6 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
return fmt.Errorf("cannot specify index & timestamp to restore")
}
// Ensure logger exists.
logger := opt.Logger
if logger == nil {
logger = r.Logger
}
logPrefix := r.Name()
if db := r.DB(); db != nil {
logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name())
}
// Ensure output path does not already exist.
if _, err := os.Stat(opt.OutputPath); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
@@ -1070,19 +1069,19 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
tmpPath := opt.OutputPath + ".tmp"
// Copy snapshot to output path.
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
r.Logger().Info("restoring snapshot", "generation", opt.Generation, "index", minWALIndex, "path", tmpPath)
if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err)
}
// If no WAL files available, move snapshot to final path & exit early.
if snapshotOnly {
logger.Printf("%s: snapshot only, finalizing database", logPrefix)
r.Logger().Info("snapshot only, finalizing database")
return os.Rename(tmpPath, opt.OutputPath)
}
// Begin processing WAL files.
logger.Printf("%s: restoring wal files: generation=%s index=[%08x,%08x]", logPrefix, opt.Generation, minWALIndex, maxWALIndex)
r.Logger().Info("restoring wal files", "generation", opt.Generation, "index_min", minWALIndex, "index_max", maxWALIndex)
// Fill input channel with all WAL indexes to be loaded in order.
// Verify every index has at least one offset.
@@ -1138,9 +1137,9 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
return err
}
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
time.Since(startTime).String(),
r.Logger().Info("downloaded wal",
"generation", opt.Generation, "index", index,
"elapsed", time.Since(startTime).String(),
)
}
}
@@ -1167,10 +1166,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
if err = applyWAL(ctx, index, tmpPath); err != nil {
return fmt.Errorf("cannot apply wal: %w", err)
}
logger.Printf("%s: applied wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
time.Since(startTime).String(),
)
r.Logger().Info("applied wal", "generation", opt.Generation, "index", index, "elapsed", time.Since(startTime).String())
}
// Ensure all goroutines finish. All errors should have been handled during
@@ -1180,7 +1176,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
}
// Copy file to final location.
logger.Printf("%s: renaming database from temporary location", logPrefix)
r.Logger().Info("renaming database from temporary location")
if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
return err
}