Add file & s3 replica metrics

This commit is contained in:
Ben Johnson
2021-01-14 16:10:02 -07:00
parent daa74f87b4
commit 8c113cf260
3 changed files with 124 additions and 37 deletions

View File

@@ -20,6 +20,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// S3 replica default settings.
@@ -50,6 +52,17 @@ type Replica struct {
wg sync.WaitGroup
cancel func()
snapshotTotalGauge prometheus.Gauge
walBytesCounter prometheus.Counter
walIndexGauge prometheus.Gauge
walOffsetGauge prometheus.Gauge
putOperationTotalCounter prometheus.Counter
putOperationBytesCounter prometheus.Counter
getOperationTotalCounter prometheus.Counter
getOperationBytesCounter prometheus.Counter
listOperationTotalCounter prometheus.Counter
deleteOperationTotalCounter prometheus.Counter
// AWS authentication keys.
AccessKeyID string
SecretAccessKey string
@@ -76,7 +89,7 @@ type Replica struct {
// NewReplica returns a new instance of Replica.
func NewReplica(db *litestream.DB, name string) *Replica {
return &Replica{
r := &Replica{
db: db,
name: name,
cancel: func() {},
@@ -87,6 +100,19 @@ func NewReplica(db *litestream.DB, name string) *Replica {
MonitorEnabled: true,
}
r.snapshotTotalGauge = internal.ReplicaSnapshotTotalGaugeVec.WithLabelValues(db.Path(), r.Name())
r.walBytesCounter = internal.ReplicaWALBytesCounterVec.WithLabelValues(db.Path(), r.Name())
r.walIndexGauge = internal.ReplicaWALIndexGaugeVec.WithLabelValues(db.Path(), r.Name())
r.walOffsetGauge = internal.ReplicaWALOffsetGaugeVec.WithLabelValues(db.Path(), r.Name())
r.putOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
r.putOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "PUT")
r.getOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
r.getOperationBytesCounter = operationBytesCounterVec.WithLabelValues(db.Path(), r.Name(), "GET")
r.listOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "LIST")
r.deleteOperationTotalCounter = operationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "DELETE")
return r
}
// Name returns the name of the replica. Returns the type if no name set.
@@ -162,6 +188,8 @@ func (r *Replica) Generations(ctx context.Context) ([]string, error) {
Prefix: aws.String(path.Join(r.Path, "generations") + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, prefix := range page.CommonPrefixes {
name := path.Base(*prefix.Prefix)
if !litestream.IsGenerationName(name) {
@@ -214,6 +242,8 @@ func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int,
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
if !litestream.IsSnapshotPath(path.Base(*obj.Key)) {
continue
@@ -240,6 +270,8 @@ func (r *Replica) walStats(ctx context.Context, generation string) (n int, min,
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
if !litestream.IsWALPath(path.Base(*obj.Key)) {
continue
@@ -279,6 +311,8 @@ func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, er
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
index, _, err := litestream.ParseSnapshotPath(key)
@@ -323,6 +357,8 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) {
Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
@@ -462,6 +498,8 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea
Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
key := path.Base(*obj.Key)
@@ -508,6 +546,11 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return err
}
pr, pw := io.Pipe()
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed)
go func() {
@@ -528,6 +571,9 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
return err
}
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size()))
return nil
}
@@ -594,6 +640,9 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
return err
}
r.snapshotTotalGauge.Set(1.0)
} else {
r.snapshotTotalGauge.Set(float64(n))
}
// Determine position, if necessary.
@@ -661,12 +710,19 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
}); err != nil {
return err
}
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(buf.Len())) // compressed bytes
// Save last replicated position.
r.mu.Lock()
r.pos = rd.Pos()
r.mu.Unlock()
// Track raw bytes processed & current position.
r.walBytesCounter.Add(float64(len(b))) // raw bytes
r.walIndexGauge.Set(float64(rd.Pos().Index))
r.walOffsetGauge.Set(float64(rd.Pos().Offset))
return nil
}
@@ -684,6 +740,8 @@ func (r *Replica) SnapshotReader(ctx context.Context, generation string, index i
if err != nil {
return nil, err
}
r.getOperationTotalCounter.Inc()
r.getOperationTotalCounter.Add(float64(*out.ContentLength))
// Decompress the snapshot file.
gr, err := gzip.NewReader(out.Body)
@@ -709,6 +767,8 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
// Read the offset & size from the filename. We need to check this
// against a running offset to ensure there are no gaps.
@@ -742,6 +802,8 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
mrc.Close()
return nil, err
}
r.getOperationTotalCounter.Inc()
r.getOperationTotalCounter.Add(float64(*out.ContentLength))
// Decompress the snapshot file.
gr, err := gzip.NewReader(out.Body)
@@ -832,6 +894,8 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.GenerationDir(generation)),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
r.listOperationTotalCounter.Inc()
for _, obj := range page.Contents {
// Skip snapshots or WALs that are after the search index unless -1.
if index != -1 {
@@ -869,6 +933,7 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
}); err != nil {
return err
}
r.deleteOperationTotalCounter.Inc()
}
return nil
@@ -907,3 +972,20 @@ func (mr *multiReadCloser) Close() (err error) {
}
return err
}
// S3 metrics.
var (
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "s3",
Name: "operation_total",
Help: "The number of S3 operations performed",
}, []string{"db", "name", "type"})
operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "s3",
Name: "operation_bytes",
Help: "The number of bytes used by S3 operations",
}, []string{"db", "name", "type"})
)