diff --git a/replica.go b/replica.go index 9f9932d..38fa14f 100644 --- a/replica.go +++ b/replica.go @@ -15,6 +15,8 @@ import ( "time" "github.com/benbjohnson/litestream/internal" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) // Replica represents a remote destination to replicate the database & WAL. @@ -89,6 +91,11 @@ type FileReplica struct { ctx context.Context cancel func() + snapshotTotalGauge prometheus.Gauge + walBytesCounter prometheus.Counter + walIndexGauge prometheus.Gauge + walOffsetGauge prometheus.Gauge + // Time to keep snapshots and related WAL files. // Database is snapshotted after interval and older WAL files are discarded. Retention time.Duration @@ -103,7 +110,7 @@ type FileReplica struct { // NewFileReplica returns a new instance of FileReplica. func NewFileReplica(db *DB, name, dst string) *FileReplica { - return &FileReplica{ + r := &FileReplica{ db: db, name: name, dst: dst, @@ -113,6 +120,13 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica { RetentionCheckInterval: DefaultRetentionCheckInterval, MonitorEnabled: true, } + + r.snapshotTotalGauge = fileReplicaSnapshotTotalGaugeVec.WithLabelValues(db.path, r.Name()) + r.walBytesCounter = fileReplicaWALBytesCounterVec.WithLabelValues(db.path, r.Name()) + r.walIndexGauge = fileReplicaWALIndexGaugeVec.WithLabelValues(db.path, r.Name()) + r.walOffsetGauge = fileReplicaWALOffsetGaugeVec.WithLabelValues(db.path, r.Name()) + + return r } // Name returns the name of the replica. Returns the type if no name set. @@ -302,8 +316,6 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) { continue } - // TODO: Add schedule name to snapshot info. - infos = append(infos, &SnapshotInfo{ Name: fi.Name(), Replica: r.Name(), @@ -531,6 +543,9 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { if err := r.snapshot(ctx, generation, dpos.Index); err != nil { return err } + r.snapshotTotalGauge.Set(1.0) + } else { + r.snapshotTotalGauge.Set(float64(n)) } // Determine position, if necessary. @@ -589,9 +604,15 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) { // Seek, copy & sync WAL contents. if _, err := w.Seek(rd.Pos().Offset, io.SeekStart); err != nil { return err - } else if _, err := io.Copy(w, rd); err != nil { + } + + n, err := io.Copy(w, rd) + r.walBytesCounter.Add(float64(n)) + if err != nil { return err - } else if err := w.Sync(); err != nil { + } + + if err := w.Sync(); err != nil { return err } else if err := w.Close(); err != nil { return err @@ -602,6 +623,10 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) { r.pos = rd.Pos() r.mu.Unlock() + // Track current position + r.walIndexGauge.Set(float64(rd.Pos().Index)) + r.walOffsetGauge.Set(float64(rd.Pos().Offset)) + return nil } @@ -903,3 +928,34 @@ func compressFile(src, dst string, uid, gid int) error { // Move compressed file to final location. return os.Rename(dst+".tmp", dst) } + +// Database metrics. +var ( + fileReplicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "litestream", + Subsystem: "file_replica", + Name: "snapshot_total", + Help: "The current number of snapshots", + }, []string{"db", "name"}) + + fileReplicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "litestream", + Subsystem: "file_replica", + Name: "wal_bytes", + Help: "The number wal bytes written", + }, []string{"db", "name"}) + + fileReplicaWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "litestream", + Subsystem: "file_replica", + Name: "wal_index", + Help: "The current WAL index", + }, []string{"db", "name"}) + + fileReplicaWALOffsetGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "litestream", + Subsystem: "file_replica", + Name: "wal_offset", + Help: "The current WAL offset", + }, []string{"db", "name"}) +)