From 1c0c69a5ab9eb6a063126463b9020b34ec930d7f Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 1 Jun 2021 18:16:58 -0600 Subject: [PATCH] Unify replica client metrics --- abs/replica_client.go | 45 +++++++++++++------------------------- gcs/replica_client.go | 40 +++++++++++----------------------- internal/internal.go | 16 ++++++++++++++ s3/replica_client.go | 49 +++++++++++++++--------------------------- sftp/replica_client.go | 34 +++++++++-------------------- 5 files changed, 71 insertions(+), 113 deletions(-) diff --git a/abs/replica_client.go b/abs/replica_client.go index d1e1f45..3c180ba 100644 --- a/abs/replica_client.go +++ b/abs/replica_client.go @@ -14,8 +14,6 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/internal" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/sync/errgroup" ) @@ -95,7 +93,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { var generations []string var marker azblob.Marker for marker.NotDone() { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{ Prefix: litestream.GenerationsPath(c.Path) + "/", @@ -130,7 +128,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) var marker azblob.Marker for marker.NotDone() { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) if err != nil { @@ -139,7 +137,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) marker = resp.NextMarker for _, item := range resp.Segment.BlobItems { - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() blobURL := c.containerURL.NewBlobURL(item.Name) if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) { @@ -185,8 +183,8 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N())) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N())) // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) @@ -217,8 +215,8 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err) } - operationTotalCounterVec.WithLabelValues("GET").Inc() - operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength())) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(resp.ContentLength())) return resp.Body(azblob.RetryReaderOptions{}), nil } @@ -234,7 +232,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i return fmt.Errorf("cannot determine snapshot path: %w", err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() blobURL := c.containerURL.NewBlobURL(key) if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) { @@ -275,8 +273,8 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N())) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N())) return litestream.WALSegmentInfo{ Generation: pos.Generation, @@ -307,8 +305,8 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err) } - operationTotalCounterVec.WithLabelValues("GET").Inc() - operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength())) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(resp.ContentLength())) return resp.Body(azblob.RetryReaderOptions{}), nil } @@ -325,7 +323,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po return fmt.Errorf("cannot determine wal segment path: %w", err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() blobURL := c.containerURL.NewBlobURL(key) if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) { @@ -375,7 +373,7 @@ func (itr *snapshotIterator) fetch() error { var marker azblob.Marker for marker.NotDone() { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) if err != nil { @@ -481,7 +479,7 @@ func (itr *walSegmentIterator) fetch() error { var marker azblob.Marker for marker.NotDone() { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) if err != nil { @@ -559,16 +557,3 @@ func isNotExists(err error) bool { return false } } - -// Azure blob storage metrics. -var ( - operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_abs_operation_total", - Help: "The number of ABS operations performed", - }, []string{"type"}) - - operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_abs_operation_bytes", - Help: "The number of bytes used by ABS operations", - }, []string{"type"}) -) diff --git a/gcs/replica_client.go b/gcs/replica_client.go index 19102cc..7b2b2c6 100644 --- a/gcs/replica_client.go +++ b/gcs/replica_client.go @@ -12,8 +12,7 @@ import ( "cloud.google.com/go/storage" "github.com/benbjohnson/litestream" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/benbjohnson/litestream/internal" "google.golang.org/api/iterator" ) @@ -105,7 +104,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) } // Iterate over every object in generation and delete it. - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for it := c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"}); ; { attrs, err := it.Next() if err == iterator.Done { @@ -119,7 +118,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) } else if err != nil { return fmt.Errorf("cannot delete object %q: %w", attrs.Name, err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() } // log.Printf("%s(%s): retainer: deleting generation: %s", r.db.Path(), r.Name(), generation) @@ -161,8 +160,8 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n)) // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) @@ -192,8 +191,8 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err) } - operationTotalCounterVec.WithLabelValues("GET").Inc() - operationBytesCounterVec.WithLabelValues("GET").Add(float64(r.Attrs.Size)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(r.Attrs.Size)) return r, nil } @@ -213,7 +212,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i return fmt.Errorf("cannot delete snapshot %q: %w", key, err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() return nil } @@ -251,8 +250,8 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n)) return litestream.WALSegmentInfo{ Generation: pos.Generation, @@ -282,8 +281,8 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos return nil, err } - operationTotalCounterVec.WithLabelValues("GET").Inc() - operationBytesCounterVec.WithLabelValues("GET").Add(float64(r.Attrs.Size)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(r.Attrs.Size)) return r, nil } @@ -303,7 +302,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) { return fmt.Errorf("cannot delete wal segment %q: %w", key, err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() } return nil @@ -427,16 +426,3 @@ func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo { func isNotExists(err error) bool { return err == storage.ErrObjectNotExist } - -// GCS metrics. -var ( - operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_gcs_operation_total", - Help: "The number of GCS operations performed", - }, []string{"type"}) - - operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_gcs_operation_bytes", - Help: "The number of bytes used by GCS operations", - }, []string{"type"}) -) diff --git a/internal/internal.go b/internal/internal.go index 769de55..b22399c 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -4,6 +4,9 @@ import ( "io" "os" "syscall" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) // ReadCloser wraps a reader to also attach a separate closer. @@ -123,3 +126,16 @@ func MkdirAll(path string, fi os.FileInfo) error { _ = os.Chown(path, uid, gid) return nil } + +// Shared replica metrics. +var ( + OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "litestream_replica_operation_total", + Help: "The number of replica operations performed", + }, []string{"replica_type", "operation"}) + + OperationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "litestream_replica_operation_bytes", + Help: "The number of bytes used by replica operations", + }, []string{"replica_type", "operation"}) +) diff --git a/s3/replica_client.go b/s3/replica_client.go index c7befcc..1dac44d 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -22,8 +22,6 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/internal" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/sync/errgroup" ) @@ -159,7 +157,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { Prefix: aws.String(litestream.GenerationsPath(c.Path) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, prefix := range page.CommonPrefixes { name := path.Base(*prefix.Prefix) @@ -193,7 +191,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) Bucket: aws.String(c.Bucket), Prefix: aws.String(dir), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { objIDs = append(objIDs, &s3.ObjectIdentifier{Key: obj.Key}) @@ -216,7 +214,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) }); err != nil { return err } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() objIDs = objIDs[n:] } @@ -255,8 +253,8 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N())) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N())) // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) @@ -288,8 +286,8 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i } else if err != nil { return nil, err } - operationTotalCounterVec.WithLabelValues("GET").Inc() - operationBytesCounterVec.WithLabelValues("GET").Add(float64(*out.ContentLength)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(*out.ContentLength)) return out.Body, nil } @@ -312,7 +310,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i return err } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() return nil } @@ -345,8 +343,8 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N())) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N())) return litestream.WALSegmentInfo{ Generation: pos.Generation, @@ -378,8 +376,8 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos } else if err != nil { return nil, err } - operationTotalCounterVec.WithLabelValues("GET").Inc() - operationBytesCounterVec.WithLabelValues("GET").Add(float64(*out.ContentLength)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "GET").Add(float64(*out.ContentLength)) return out.Body, nil } @@ -414,7 +412,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po return err } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() a = a[n:] } @@ -439,7 +437,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error { Bucket: aws.String(c.Bucket), Prefix: aws.String(prefix), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { objIDs = append(objIDs, &s3.ObjectIdentifier{Key: obj.Key}) @@ -462,7 +460,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error { }); err != nil { return err } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() objIDs = objIDs[n:] } @@ -510,7 +508,7 @@ func (itr *snapshotIterator) fetch() error { Prefix: aws.String(dir + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { key := path.Base(*obj.Key) @@ -613,7 +611,7 @@ func (itr *walSegmentIterator) fetch() error { Prefix: aws.String(dir + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { - operationTotalCounterVec.WithLabelValues("LIST").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { key := path.Base(*obj.Key) @@ -736,16 +734,3 @@ func isNotExists(err error) bool { return false } } - -// S3 metrics. -var ( - operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_s3_operation_total", - Help: "The number of S3 operations performed", - }, []string{"type"}) - - operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_s3_operation_bytes", - Help: "The number of bytes used by S3 operations", - }, []string{"type"}) -) diff --git a/sftp/replica_client.go b/sftp/replica_client.go index 19290b4..2d5eef0 100644 --- a/sftp/replica_client.go +++ b/sftp/replica_client.go @@ -13,9 +13,8 @@ import ( "time" "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/internal" "github.com/pkg/sftp" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/crypto/ssh" ) @@ -178,7 +177,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) return fmt.Errorf("cannot delete file %q: %w", walker.Path(), err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() } // Remove directories in reverse order after they have been emptied. @@ -269,8 +268,8 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n)) // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) @@ -301,7 +300,7 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i return nil, err } - operationTotalCounterVec.WithLabelValues("GET").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() return f, nil } @@ -324,7 +323,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i return fmt.Errorf("cannot delete snapshot %q: %w", filename, err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() return nil } @@ -403,8 +402,8 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, return info, err } - operationTotalCounterVec.WithLabelValues("PUT").Inc() - operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n)) + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n)) return litestream.WALSegmentInfo{ Generation: pos.Generation, @@ -435,7 +434,7 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos return nil, err } - operationTotalCounterVec.WithLabelValues("GET").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() return f, nil } @@ -458,7 +457,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { return fmt.Errorf("cannot delete wal segment %q: %w", filename, err) } - operationTotalCounterVec.WithLabelValues("DELETE").Inc() + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() } return nil @@ -496,16 +495,3 @@ func (c *ReplicaClient) resetOnConnError(err error) { c.sshClient = nil } } - -// SFTP metrics. -var ( - operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_sftp_operation_total", - Help: "The number of SFTP operations performed", - }, []string{"type"}) - - operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "litestream_sftp_operation_bytes", - Help: "The number of bytes used by SFTP operations", - }, []string{"type"}) -)