diff --git a/internal/metrics.go b/internal/metrics.go new file mode 100644 index 0000000..2d14e6e --- /dev/null +++ b/internal/metrics.go @@ -0,0 +1,37 @@ +package internal + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Shared replica metrics. +var ( + ReplicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "litestream", + Subsystem: "replica", + Name: "snapshot_total", + Help: "The current number of snapshots", + }, []string{"db", "name"}) + + ReplicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "litestream", + Subsystem: "replica", + Name: "wal_bytes", + Help: "The number wal bytes written", + }, []string{"db", "name"}) + + ReplicaWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "litestream", + Subsystem: "replica", + Name: "wal_index", + Help: "The current WAL index", + }, []string{"db", "name"}) + + ReplicaWALOffsetGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "litestream", + Subsystem: "replica", + Name: "wal_offset", + Help: "The current WAL offset", + }, []string{"db", "name"}) +) diff --git a/replica.go b/replica.go index 38fa14f..a3e26a9 100644 --- a/replica.go +++ b/replica.go @@ -16,7 +16,6 @@ import ( "github.com/benbjohnson/litestream/internal" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" ) // Replica represents a remote destination to replicate the database & WAL. @@ -121,10 +120,10 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica { MonitorEnabled: true, } - r.snapshotTotalGauge = fileReplicaSnapshotTotalGaugeVec.WithLabelValues(db.path, r.Name()) - r.walBytesCounter = fileReplicaWALBytesCounterVec.WithLabelValues(db.path, r.Name()) - r.walIndexGauge = fileReplicaWALIndexGaugeVec.WithLabelValues(db.path, r.Name()) - r.walOffsetGauge = fileReplicaWALOffsetGaugeVec.WithLabelValues(db.path, r.Name()) + r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.path, r.Name()) + r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.path, r.Name()) + r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.path, r.Name()) + r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.path, r.Name()) return r } @@ -928,34 +927,3 @@ func compressFile(src, dst string, uid, gid int) error { // Move compressed file to final location. return os.Rename(dst+".tmp", dst) } - -// Database metrics. -var ( - fileReplicaSnapshotTotalGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "litestream", - Subsystem: "file_replica", - Name: "snapshot_total", - Help: "The current number of snapshots", - }, []string{"db", "name"}) - - fileReplicaWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "litestream", - Subsystem: "file_replica", - Name: "wal_bytes", - Help: "The number wal bytes written", - }, []string{"db", "name"}) - - fileReplicaWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "litestream", - Subsystem: "file_replica", - Name: "wal_index", - Help: "The current WAL index", - }, []string{"db", "name"}) - - fileReplicaWALOffsetGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "litestream", - Subsystem: "file_replica", - Name: "wal_offset", - Help: "The current WAL offset", - }, []string{"db", "name"}) -) diff --git a/s3/s3.go b/s3/s3.go index 9614221..e299f8a 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -20,6 +20,8 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/internal" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) // S3 replica default settings. @@ -50,6 +52,17 @@ type Replica struct { wg sync.WaitGroup cancel func() + snapshotTotalGauge prometheus.Gauge + walBytesCounter prometheus.Counter + walIndexGauge prometheus.Gauge + walOffsetGauge prometheus.Gauge + putOperationTotalCounter prometheus.Counter + putOperationBytesCounter prometheus.Counter + getOperationTotalCounter prometheus.Counter + getOperationBytesCounter prometheus.Counter + listOperationTotalCounter prometheus.Counter + deleteOperationTotalCounter prometheus.Counter + // AWS authentication keys. AccessKeyID string SecretAccessKey string @@ -76,7 +89,7 @@ type Replica struct { // NewReplica returns a new instance of Replica. func NewReplica(db *litestream.DB, name string) *Replica { - return &Replica{ + r := &Replica{ db: db, name: name, cancel: func() {}, @@ -87,6 +100,19 @@ func NewReplica(db *litestream.DB, name string) *Replica { MonitorEnabled: true, } + + r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name()) + r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name()) + r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name()) + r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name()) + r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") + r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT") + r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") + r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET") + r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST") + r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE") + + return r } // Name returns the name of the replica. Returns the type if no name set. @@ -162,6 +188,8 @@ func (r *Replica) Generations(ctx context.Context) ([]string, error) { Prefix: aws.String(path.Join(r.Path, "generations") + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, prefix := range page.CommonPrefixes { name := path.Base(*prefix.Prefix) if !litestream.IsGenerationName(name) { @@ -214,6 +242,8 @@ func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, Bucket: aws.String(r.Bucket), Prefix: aws.String(r.SnapshotDir(generation) + "/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, obj := range page.Contents { if !litestream.IsSnapshotPath(path.Base(*obj.Key)) { continue @@ -240,6 +270,8 @@ func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, Bucket: aws.String(r.Bucket), Prefix: aws.String(r.WALDir(generation) + "/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, obj := range page.Contents { if !litestream.IsWALPath(path.Base(*obj.Key)) { continue @@ -279,6 +311,8 @@ func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, er Prefix: aws.String(r.SnapshotDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, obj := range page.Contents { key := path.Base(*obj.Key) index, _, err := litestream.ParseSnapshotPath(key) @@ -323,6 +357,8 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) { Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, obj := range page.Contents { key := path.Base(*obj.Key) @@ -462,6 +498,8 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, obj := range page.Contents { key := path.Base(*obj.Key) @@ -508,6 +546,11 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er } defer f.Close() + fi, err := f.Stat() + if err != nil { + return err + } + pr, pw := io.Pipe() gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) go func() { @@ -528,6 +571,9 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er return err } + r.putOperationTotalCounter.Inc() + r.putOperationBytesCounter.Add(float64(fi.Size())) + return nil } @@ -594,6 +640,9 @@ func (r *Replica) Sync(ctx context.Context) (err error) { if err := r.snapshot(ctx, generation, dpos.Index); err != nil { return err } + r.snapshotTotalGauge.Set(1.0) + } else { + r.snapshotTotalGauge.Set(float64(n)) } // Determine position, if necessary. @@ -661,12 +710,19 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { }); err != nil { return err } + r.putOperationTotalCounter.Inc() + r.putOperationBytesCounter.Add(float64(buf.Len())) // compressed bytes // Save last replicated position. r.mu.Lock() r.pos = rd.Pos() r.mu.Unlock() + // Track raw bytes processed & current position. + r.walBytesCounter.Add(float64(len(b))) // raw bytes + r.walIndexGauge.Set(float64(rd.Pos().Index)) + r.walOffsetGauge.Set(float64(rd.Pos().Offset)) + return nil } @@ -684,6 +740,8 @@ func (r *Replica) SnapshotReader(ctx context.Context, generation string, index i if err != nil { return nil, err } + r.getOperationTotalCounter.Inc() + r.getOperationTotalCounter.Add(float64(*out.ContentLength)) // Decompress the snapshot file. gr, err := gzip.NewReader(out.Body) @@ -709,6 +767,8 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( Bucket: aws.String(r.Bucket), Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, obj := range page.Contents { // Read the offset & size from the filename. We need to check this // against a running offset to ensure there are no gaps. @@ -742,6 +802,8 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( mrc.Close() return nil, err } + r.getOperationTotalCounter.Inc() + r.getOperationTotalCounter.Add(float64(*out.ContentLength)) // Decompress the snapshot file. gr, err := gzip.NewReader(out.Body) @@ -832,6 +894,8 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, Bucket: aws.String(r.Bucket), Prefix: aws.String(r.GenerationDir(generation)), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + r.listOperationTotalCounter.Inc() + for _, obj := range page.Contents { // Skip snapshots or WALs that are after the search index unless -1. if index != -1 { @@ -869,6 +933,7 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, }); err != nil { return err } + r.deleteOperationTotalCounter.Inc() } return nil @@ -907,3 +972,20 @@ func (mr *multiReadCloser) Close() (err error) { } return err } + +// S3 metrics. +var ( + operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "litestream", + Subsystem: "s3", + Name: "operation_total", + Help: "The number of S3 operations performed", + }, []string{"db", "name", "type"}) + + operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "litestream", + Subsystem: "s3", + Name: "operation_bytes", + Help: "The number of bytes used by S3 operations", + }, []string{"db", "name", "type"}) +)