Plumb a custom logger through the core rather than only in Restore (#481)
This commit is contained in:
11
db.go
11
db.go
@@ -94,6 +94,9 @@ type DB struct {
|
|||||||
// List of replicas for the database.
|
// List of replicas for the database.
|
||||||
// Must be set before calling Open().
|
// Must be set before calling Open().
|
||||||
Replicas []*Replica
|
Replicas []*Replica
|
||||||
|
|
||||||
|
// Where to send log messages, defaults to log.Default()
|
||||||
|
Logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDB returns a new instance of DB for a given path.
|
// NewDB returns a new instance of DB for a given path.
|
||||||
@@ -106,6 +109,8 @@ func NewDB(path string) *DB {
|
|||||||
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
||||||
CheckpointInterval: DefaultCheckpointInterval,
|
CheckpointInterval: DefaultCheckpointInterval,
|
||||||
MonitorInterval: DefaultMonitorInterval,
|
MonitorInterval: DefaultMonitorInterval,
|
||||||
|
|
||||||
|
Logger: log.Default(),
|
||||||
}
|
}
|
||||||
|
|
||||||
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
|
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
|
||||||
@@ -447,7 +452,7 @@ func (db *DB) init() (err error) {
|
|||||||
|
|
||||||
// If we have an existing shadow WAL, ensure the headers match.
|
// If we have an existing shadow WAL, ensure the headers match.
|
||||||
if err := db.verifyHeadersMatch(); err != nil {
|
if err := db.verifyHeadersMatch(); err != nil {
|
||||||
log.Printf("%s: init: cannot determine last wal position, clearing generation; %s", db.path, err)
|
db.Logger.Printf("%s: init: cannot determine last wal position, clearing generation; %s", db.path, err)
|
||||||
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("remove generation name: %w", err)
|
return fmt.Errorf("remove generation name: %w", err)
|
||||||
}
|
}
|
||||||
@@ -726,7 +731,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
if info.generation, err = db.createGeneration(); err != nil {
|
if info.generation, err = db.createGeneration(); err != nil {
|
||||||
return fmt.Errorf("create generation: %w", err)
|
return fmt.Errorf("create generation: %w", err)
|
||||||
}
|
}
|
||||||
log.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
|
db.Logger.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
|
||||||
|
|
||||||
// Clear shadow wal info.
|
// Clear shadow wal info.
|
||||||
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
|
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
|
||||||
@@ -1365,7 +1370,7 @@ func (db *DB) monitor() {
|
|||||||
|
|
||||||
// Sync the database to the shadow WAL.
|
// Sync the database to the shadow WAL.
|
||||||
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
|
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
|
||||||
log.Printf("%s: sync error: %s", db.path, err)
|
db.Logger.Printf("%s: sync error: %s", db.path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
26
replica.go
26
replica.go
@@ -72,6 +72,9 @@ type Replica struct {
|
|||||||
// Encryption identities and recipients
|
// Encryption identities and recipients
|
||||||
AgeIdentities []age.Identity
|
AgeIdentities []age.Identity
|
||||||
AgeRecipients []age.Recipient
|
AgeRecipients []age.Recipient
|
||||||
|
|
||||||
|
// The logger to send logging messages to. Defaults to log.Default()
|
||||||
|
Logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReplica(db *DB, name string) *Replica {
|
func NewReplica(db *DB, name string) *Replica {
|
||||||
@@ -84,6 +87,7 @@ func NewReplica(db *DB, name string) *Replica {
|
|||||||
Retention: DefaultRetention,
|
Retention: DefaultRetention,
|
||||||
RetentionCheckInterval: DefaultRetentionCheckInterval,
|
RetentionCheckInterval: DefaultRetentionCheckInterval,
|
||||||
MonitorEnabled: true,
|
MonitorEnabled: true,
|
||||||
|
Logger: log.Default(),
|
||||||
}
|
}
|
||||||
|
|
||||||
return r
|
return r
|
||||||
@@ -534,7 +538,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
|||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
|
r.Logger.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
@@ -602,7 +606,7 @@ func (r *Replica) deleteSnapshotsBeforeIndex(ctx context.Context, generation str
|
|||||||
if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil {
|
if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil {
|
||||||
return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err)
|
return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err)
|
||||||
}
|
}
|
||||||
log.Printf("%s(%s): snapshot deleted %s/%08x", r.db.Path(), r.Name(), generation, index)
|
r.Logger.Printf("%s(%s): snapshot deleted %s/%08x", r.db.Path(), r.Name(), generation, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
return itr.Close()
|
return itr.Close()
|
||||||
@@ -634,7 +638,7 @@ func (r *Replica) deleteWALSegmentsBeforeIndex(ctx context.Context, generation s
|
|||||||
if err := r.Client.DeleteWALSegments(ctx, a); err != nil {
|
if err := r.Client.DeleteWALSegments(ctx, a); err != nil {
|
||||||
return fmt.Errorf("delete wal segments: %w", err)
|
return fmt.Errorf("delete wal segments: %w", err)
|
||||||
}
|
}
|
||||||
log.Printf("%s(%s): wal segmented deleted before %s/%08x: n=%d", r.db.Path(), r.Name(), generation, index, len(a))
|
r.Logger.Printf("%s(%s): wal segmented deleted before %s/%08x: n=%d", r.db.Path(), r.Name(), generation, index, len(a))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -671,7 +675,7 @@ func (r *Replica) monitor(ctx context.Context) {
|
|||||||
|
|
||||||
// Synchronize the shadow wal into the replication directory.
|
// Synchronize the shadow wal into the replication directory.
|
||||||
if err := r.Sync(ctx); err != nil {
|
if err := r.Sync(ctx); err != nil {
|
||||||
log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
|
r.Logger.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -699,7 +703,7 @@ func (r *Replica) retainer(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err := r.EnforceRetention(ctx); err != nil {
|
if err := r.EnforceRetention(ctx); err != nil {
|
||||||
log.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
|
r.Logger.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -721,7 +725,7 @@ func (r *Replica) snapshotter(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
|
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
|
||||||
log.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
|
r.Logger.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -749,7 +753,7 @@ func (r *Replica) validator(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err := r.Validate(ctx); err != nil {
|
if err := r.Validate(ctx); err != nil {
|
||||||
log.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
|
r.Logger.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -786,7 +790,7 @@ func (r *Replica) Validate(ctx context.Context) error {
|
|||||||
ReplicaName: r.Name(),
|
ReplicaName: r.Name(),
|
||||||
Generation: pos.Generation,
|
Generation: pos.Generation,
|
||||||
Index: pos.Index - 1,
|
Index: pos.Index - 1,
|
||||||
Logger: log.New(os.Stderr, "", 0),
|
Logger: r.Logger,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return fmt.Errorf("cannot restore: %w", err)
|
return fmt.Errorf("cannot restore: %w", err)
|
||||||
}
|
}
|
||||||
@@ -811,7 +815,7 @@ func (r *Replica) Validate(ctx context.Context) error {
|
|||||||
if mismatch {
|
if mismatch {
|
||||||
status = "mismatch"
|
status = "mismatch"
|
||||||
}
|
}
|
||||||
log.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
|
r.Logger.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
|
||||||
|
|
||||||
// Validate checksums match.
|
// Validate checksums match.
|
||||||
if mismatch {
|
if mismatch {
|
||||||
@@ -853,7 +857,7 @@ func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
|
|||||||
// Obtain current position of replica, check if past target position.
|
// Obtain current position of replica, check if past target position.
|
||||||
curr := r.Pos()
|
curr := r.Pos()
|
||||||
if curr.IsZero() {
|
if curr.IsZero() {
|
||||||
log.Printf("%s(%s): validator: no replica position available", db.Path(), r.Name())
|
r.Logger.Printf("%s(%s): validator: no replica position available", db.Path(), r.Name())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1008,7 +1012,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
|||||||
// Ensure logger exists.
|
// Ensure logger exists.
|
||||||
logger := opt.Logger
|
logger := opt.Logger
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.New(ioutil.Discard, "", 0)
|
logger = r.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
logPrefix := r.Name()
|
logPrefix := r.Name()
|
||||||
|
|||||||
Reference in New Issue
Block a user