Add Azure Blob Storage replica type
This commit is contained in:
11
.github/workflows/test.yml
vendored
11
.github/workflows/test.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
- name: Run unit tests
|
||||
run: go test -v ./...
|
||||
|
||||
- name: Run s3 integration tests
|
||||
- name: Run aws s3 tests
|
||||
run: go test -v -run=TestReplicaClient . -integration s3
|
||||
env:
|
||||
LITESTREAM_S3_ACCESS_KEY_ID: ${{ secrets.LITESTREAM_S3_ACCESS_KEY_ID }}
|
||||
@@ -34,8 +34,15 @@ jobs:
|
||||
LITESTREAM_S3_REGION: ${{ secrets.LITESTREAM_S3_REGION }}
|
||||
LITESTREAM_S3_BUCKET: ${{ secrets.LITESTREAM_S3_BUCKET }}
|
||||
|
||||
- name: Run gcs integration tests
|
||||
- name: Run google cloud storage (gcs) tests
|
||||
run: go test -v -run=TestReplicaClient . -integration gcs
|
||||
env:
|
||||
GOOGLE_APPLICATION_CREDENTIALS: /opt/gcp.json
|
||||
LITESTREAM_GCS_BUCKET: ${{ secrets.LITESTREAM_GCS_BUCKET }}
|
||||
|
||||
- name: Run azure blob storage (abs) tests
|
||||
run: go test -v -run=TestReplicaClient . -integration abs
|
||||
env:
|
||||
LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }}
|
||||
LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }}
|
||||
LITESTREAM_ABS_BUCKET: ${{ secrets.LITESTREAM_ABS_BUCKET }}
|
||||
|
||||
570
abs/replica_client.go
Normal file
570
abs/replica_client.go
Normal file
@@ -0,0 +1,570 @@
|
||||
package abs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
"github.com/benbjohnson/litestream"
|
||||
"github.com/benbjohnson/litestream/internal"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ReplicaClientType is the client type for this package.
|
||||
const ReplicaClientType = "abs"
|
||||
|
||||
var _ litestream.ReplicaClient = (*ReplicaClient)(nil)
|
||||
|
||||
// ReplicaClient is a client for writing snapshots & WAL segments to disk.
|
||||
type ReplicaClient struct {
|
||||
mu sync.Mutex
|
||||
containerURL *azblob.ContainerURL
|
||||
|
||||
// Azure credentials
|
||||
AccountName string
|
||||
AccountKey string
|
||||
Endpoint string
|
||||
|
||||
// Azure Blob Storage container information
|
||||
Bucket string
|
||||
Path string
|
||||
}
|
||||
|
||||
// NewReplicaClient returns a new instance of ReplicaClient.
|
||||
func NewReplicaClient() *ReplicaClient {
|
||||
return &ReplicaClient{}
|
||||
}
|
||||
|
||||
// Type returns "abs" as the client type.
|
||||
func (c *ReplicaClient) Type() string {
|
||||
return ReplicaClientType
|
||||
}
|
||||
|
||||
// Init initializes the connection to Azure. No-op if already initialized.
|
||||
func (c *ReplicaClient) Init(ctx context.Context) (err error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.containerURL != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Authenticate to ACS.
|
||||
credential, err := azblob.NewSharedKeyCredential(c.AccountName, c.AccountKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Construct & parse endpoint unless already set.
|
||||
endpoint := c.Endpoint
|
||||
if endpoint == "" {
|
||||
endpoint = fmt.Sprintf("https://%s.blob.core.windows.net", c.AccountName)
|
||||
}
|
||||
endpointURL, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse azure endpoint: %w", err)
|
||||
}
|
||||
|
||||
// Build pipeline and reference to container.
|
||||
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{})
|
||||
containerURL := azblob.NewServiceURL(*endpointURL, pipeline).NewContainerURL(c.Bucket)
|
||||
c.containerURL = &containerURL
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Generations returns a list of available generation names.
|
||||
func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var generations []string
|
||||
var marker azblob.Marker
|
||||
for marker.NotDone() {
|
||||
operationTotalCounterVec.WithLabelValues("LIST").Inc()
|
||||
|
||||
resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
|
||||
Prefix: litestream.GenerationsPath(c.Path) + "/",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
marker = resp.NextMarker
|
||||
|
||||
for _, prefix := range resp.Segment.BlobPrefixes {
|
||||
name := path.Base(strings.TrimSuffix(prefix.Name, "/"))
|
||||
if !litestream.IsGenerationName(name) {
|
||||
continue
|
||||
}
|
||||
generations = append(generations, name)
|
||||
}
|
||||
}
|
||||
|
||||
return generations, nil
|
||||
}
|
||||
|
||||
// DeleteGeneration deletes all snapshots & WAL segments within a generation.
|
||||
func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dir, err := litestream.GenerationPath(c.Path, generation)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine generation path: %w", err)
|
||||
}
|
||||
|
||||
var marker azblob.Marker
|
||||
for marker.NotDone() {
|
||||
operationTotalCounterVec.WithLabelValues("LIST").Inc()
|
||||
|
||||
resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
marker = resp.NextMarker
|
||||
|
||||
for _, item := range resp.Segment.BlobItems {
|
||||
operationTotalCounterVec.WithLabelValues("DELETE").Inc()
|
||||
|
||||
blobURL := c.containerURL.NewBlobURL(item.Name)
|
||||
if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) {
|
||||
continue
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// log.Printf("%s(%s): retainer: deleting generation: %s", r.db.Path(), r.Name(), generation)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Snapshots returns an iterator over all available snapshots for a generation.
|
||||
func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newSnapshotIterator(ctx, generation, c), nil
|
||||
}
|
||||
|
||||
// WriteSnapshot writes LZ4 compressed data from rd to the object storage.
|
||||
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
key, err := litestream.SnapshotPath(c.Path, generation, index)
|
||||
if err != nil {
|
||||
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
|
||||
}
|
||||
startTime := time.Now()
|
||||
|
||||
rc := internal.NewReadCounter(rd)
|
||||
|
||||
blobURL := c.containerURL.NewBlockBlobURL(key)
|
||||
if _, err := azblob.UploadStreamToBlockBlob(ctx, rc, blobURL, azblob.UploadStreamToBlockBlobOptions{
|
||||
BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: "application/octet-stream"},
|
||||
BlobAccessTier: azblob.DefaultAccessTier,
|
||||
}); err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
operationTotalCounterVec.WithLabelValues("PUT").Inc()
|
||||
operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N()))
|
||||
|
||||
// log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))
|
||||
|
||||
return litestream.SnapshotInfo{
|
||||
Generation: generation,
|
||||
Index: index,
|
||||
Size: rc.N(),
|
||||
CreatedAt: startTime.UTC(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SnapshotReader returns a reader for snapshot data at the given generation/index.
|
||||
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := litestream.SnapshotPath(c.Path, generation, index)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
|
||||
}
|
||||
|
||||
blobURL := c.containerURL.NewBlobURL(key)
|
||||
resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
|
||||
if isNotExists(err) {
|
||||
return nil, os.ErrNotExist
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err)
|
||||
}
|
||||
|
||||
operationTotalCounterVec.WithLabelValues("GET").Inc()
|
||||
operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength()))
|
||||
|
||||
return resp.Body(azblob.RetryReaderOptions{}), nil
|
||||
}
|
||||
|
||||
// DeleteSnapshot deletes a snapshot with the given generation & index.
|
||||
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key, err := litestream.SnapshotPath(c.Path, generation, index)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine snapshot path: %w", err)
|
||||
}
|
||||
|
||||
operationTotalCounterVec.WithLabelValues("DELETE").Inc()
|
||||
|
||||
blobURL := c.containerURL.NewBlobURL(key)
|
||||
if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("cannot delete snapshot %q: %w", key, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WALSegments returns an iterator over all available WAL files for a generation.
|
||||
func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newWALSegmentIterator(ctx, generation, c), nil
|
||||
}
|
||||
|
||||
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
|
||||
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
|
||||
if err != nil {
|
||||
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
|
||||
}
|
||||
startTime := time.Now()
|
||||
|
||||
rc := internal.NewReadCounter(rd)
|
||||
|
||||
blobURL := c.containerURL.NewBlockBlobURL(key)
|
||||
if _, err := azblob.UploadStreamToBlockBlob(ctx, rc, blobURL, azblob.UploadStreamToBlockBlobOptions{
|
||||
BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: "application/octet-stream"},
|
||||
BlobAccessTier: azblob.DefaultAccessTier,
|
||||
}); err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
operationTotalCounterVec.WithLabelValues("PUT").Inc()
|
||||
operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N()))
|
||||
|
||||
return litestream.WALSegmentInfo{
|
||||
Generation: pos.Generation,
|
||||
Index: pos.Index,
|
||||
Offset: pos.Offset,
|
||||
Size: rc.N(),
|
||||
CreatedAt: startTime.UTC(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// WALSegmentReader returns a reader for a section of WAL data at the given index.
|
||||
// Returns os.ErrNotExist if no matching index/offset is found.
|
||||
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
|
||||
}
|
||||
|
||||
blobURL := c.containerURL.NewBlobURL(key)
|
||||
resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
|
||||
if isNotExists(err) {
|
||||
return nil, os.ErrNotExist
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err)
|
||||
}
|
||||
|
||||
operationTotalCounterVec.WithLabelValues("GET").Inc()
|
||||
operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength()))
|
||||
|
||||
return resp.Body(azblob.RetryReaderOptions{}), nil
|
||||
}
|
||||
|
||||
// DeleteWALSegments deletes WAL segments with at the given positions.
|
||||
func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, pos := range a {
|
||||
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine wal segment path: %w", err)
|
||||
}
|
||||
|
||||
operationTotalCounterVec.WithLabelValues("DELETE").Inc()
|
||||
|
||||
blobURL := c.containerURL.NewBlobURL(key)
|
||||
if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) {
|
||||
continue
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("cannot delete wal segment %q: %w", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type snapshotIterator struct {
|
||||
client *ReplicaClient
|
||||
generation string
|
||||
|
||||
ch chan litestream.SnapshotInfo
|
||||
g errgroup.Group
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
info litestream.SnapshotInfo
|
||||
err error
|
||||
}
|
||||
|
||||
func newSnapshotIterator(ctx context.Context, generation string, client *ReplicaClient) *snapshotIterator {
|
||||
itr := &snapshotIterator{
|
||||
client: client,
|
||||
generation: generation,
|
||||
ch: make(chan litestream.SnapshotInfo),
|
||||
}
|
||||
|
||||
itr.ctx, itr.cancel = context.WithCancel(ctx)
|
||||
itr.g.Go(itr.fetch)
|
||||
|
||||
return itr
|
||||
}
|
||||
|
||||
// fetch runs in a separate goroutine to fetch pages of objects and stream them to a channel.
|
||||
func (itr *snapshotIterator) fetch() error {
|
||||
defer close(itr.ch)
|
||||
|
||||
dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine snapshots path: %w", err)
|
||||
}
|
||||
|
||||
var marker azblob.Marker
|
||||
for marker.NotDone() {
|
||||
operationTotalCounterVec.WithLabelValues("LIST").Inc()
|
||||
|
||||
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
marker = resp.NextMarker
|
||||
|
||||
for _, item := range resp.Segment.BlobItems {
|
||||
key := path.Base(item.Name)
|
||||
index, err := litestream.ParseSnapshotPath(key)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
info := litestream.SnapshotInfo{
|
||||
Generation: itr.generation,
|
||||
Index: index,
|
||||
Size: *item.Properties.ContentLength,
|
||||
CreatedAt: item.Properties.CreationTime.UTC(),
|
||||
}
|
||||
|
||||
select {
|
||||
case <-itr.ctx.Done():
|
||||
case itr.ch <- info:
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *snapshotIterator) Close() (err error) {
|
||||
err = itr.err
|
||||
|
||||
// Cancel context and wait for error group to finish.
|
||||
itr.cancel()
|
||||
if e := itr.g.Wait(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (itr *snapshotIterator) Next() bool {
|
||||
// Exit if an error has already occurred.
|
||||
if itr.err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Return false if context was canceled or if there are no more snapshots.
|
||||
// Otherwise fetch the next snapshot and store it on the iterator.
|
||||
select {
|
||||
case <-itr.ctx.Done():
|
||||
return false
|
||||
case info, ok := <-itr.ch:
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
itr.info = info
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *snapshotIterator) Err() error { return itr.err }
|
||||
|
||||
func (itr *snapshotIterator) Snapshot() litestream.SnapshotInfo {
|
||||
return itr.info
|
||||
}
|
||||
|
||||
type walSegmentIterator struct {
|
||||
client *ReplicaClient
|
||||
generation string
|
||||
|
||||
ch chan litestream.WALSegmentInfo
|
||||
g errgroup.Group
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
info litestream.WALSegmentInfo
|
||||
err error
|
||||
}
|
||||
|
||||
func newWALSegmentIterator(ctx context.Context, generation string, client *ReplicaClient) *walSegmentIterator {
|
||||
itr := &walSegmentIterator{
|
||||
client: client,
|
||||
generation: generation,
|
||||
ch: make(chan litestream.WALSegmentInfo),
|
||||
}
|
||||
|
||||
itr.ctx, itr.cancel = context.WithCancel(ctx)
|
||||
itr.g.Go(itr.fetch)
|
||||
|
||||
return itr
|
||||
}
|
||||
|
||||
// fetch runs in a separate goroutine to fetch pages of objects and stream them to a channel.
|
||||
func (itr *walSegmentIterator) fetch() error {
|
||||
defer close(itr.ch)
|
||||
|
||||
dir, err := litestream.WALPath(itr.client.Path, itr.generation)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine wal path: %w", err)
|
||||
}
|
||||
|
||||
var marker azblob.Marker
|
||||
for marker.NotDone() {
|
||||
operationTotalCounterVec.WithLabelValues("LIST").Inc()
|
||||
|
||||
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
marker = resp.NextMarker
|
||||
|
||||
for _, item := range resp.Segment.BlobItems {
|
||||
key := path.Base(item.Name)
|
||||
index, offset, err := litestream.ParseWALSegmentPath(key)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
info := litestream.WALSegmentInfo{
|
||||
Generation: itr.generation,
|
||||
Index: index,
|
||||
Offset: offset,
|
||||
Size: *item.Properties.ContentLength,
|
||||
CreatedAt: item.Properties.CreationTime.UTC(),
|
||||
}
|
||||
|
||||
select {
|
||||
case <-itr.ctx.Done():
|
||||
case itr.ch <- info:
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *walSegmentIterator) Close() (err error) {
|
||||
err = itr.err
|
||||
|
||||
// Cancel context and wait for error group to finish.
|
||||
itr.cancel()
|
||||
if e := itr.g.Wait(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (itr *walSegmentIterator) Next() bool {
|
||||
// Exit if an error has already occurred.
|
||||
if itr.err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Return false if context was canceled or if there are no more segments.
|
||||
// Otherwise fetch the next segment and store it on the iterator.
|
||||
select {
|
||||
case <-itr.ctx.Done():
|
||||
return false
|
||||
case info, ok := <-itr.ch:
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
itr.info = info
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *walSegmentIterator) Err() error { return itr.err }
|
||||
|
||||
func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo {
|
||||
return itr.info
|
||||
}
|
||||
|
||||
func isNotExists(err error) bool {
|
||||
switch err := err.(type) {
|
||||
case azblob.StorageError:
|
||||
return err.ServiceCode() == azblob.ServiceCodeBlobNotFound
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Azure blob storage metrics.
|
||||
var (
|
||||
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "litestream_abs_operation_total",
|
||||
Help: "The number of ABS operations performed",
|
||||
}, []string{"type"})
|
||||
|
||||
operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "litestream_abs_operation_bytes",
|
||||
Help: "The number of bytes used by ABS operations",
|
||||
}, []string{"type"})
|
||||
)
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/litestream"
|
||||
"github.com/benbjohnson/litestream/abs"
|
||||
"github.com/benbjohnson/litestream/file"
|
||||
"github.com/benbjohnson/litestream/gcs"
|
||||
"github.com/benbjohnson/litestream/s3"
|
||||
@@ -292,6 +293,10 @@ type ReplicaConfig struct {
|
||||
Endpoint string `yaml:"endpoint"`
|
||||
ForcePathStyle *bool `yaml:"force-path-style"`
|
||||
SkipVerify bool `yaml:"skip-verify"`
|
||||
|
||||
// ABS settings
|
||||
AccountName string `yaml:"account-name"`
|
||||
AccountKey string `yaml:"account-key"`
|
||||
}
|
||||
|
||||
// NewReplicaFromConfig instantiates a replica for a DB based on a config.
|
||||
@@ -335,6 +340,10 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
|
||||
if r.Client, err = newGCSReplicaClientFromConfig(c, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "abs":
|
||||
if r.Client, err = newABSReplicaClientFromConfig(c, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
|
||||
}
|
||||
@@ -475,6 +484,48 @@ func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// newABSReplicaClientFromConfig returns a new instance of abs.ReplicaClient built from config.
|
||||
func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *abs.ReplicaClient, err error) {
|
||||
// Ensure URL & constituent parts are not both specified.
|
||||
if c.URL != "" && c.Path != "" {
|
||||
return nil, fmt.Errorf("cannot specify url & path for abs replica")
|
||||
} else if c.URL != "" && c.Bucket != "" {
|
||||
return nil, fmt.Errorf("cannot specify url & bucket for abs replica")
|
||||
}
|
||||
|
||||
bucket, path := c.Bucket, c.Path
|
||||
|
||||
// Apply settings from URL, if specified.
|
||||
if c.URL != "" {
|
||||
_, uhost, upath, err := ParseReplicaURL(c.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Only apply URL parts to field that have not been overridden.
|
||||
if path == "" {
|
||||
path = upath
|
||||
}
|
||||
if bucket == "" {
|
||||
bucket = uhost
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure required settings are set.
|
||||
if bucket == "" {
|
||||
return nil, fmt.Errorf("bucket required for abs replica")
|
||||
}
|
||||
|
||||
// Build replica.
|
||||
client := abs.NewReplicaClient()
|
||||
client.AccountName = c.AccountName
|
||||
client.AccountKey = c.AccountKey
|
||||
client.Bucket = bucket
|
||||
client.Path = path
|
||||
client.Endpoint = c.Endpoint
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to
|
||||
// their AWS counterparts as the "AWS" prefix can be confusing when using a
|
||||
// non-AWS S3-compatible service.
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/benbjohnson/litestream"
|
||||
"github.com/benbjohnson/litestream/abs"
|
||||
"github.com/benbjohnson/litestream/file"
|
||||
"github.com/benbjohnson/litestream/gcs"
|
||||
"github.com/benbjohnson/litestream/s3"
|
||||
@@ -111,6 +112,8 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
|
||||
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Region, client.Endpoint, r.SyncInterval)
|
||||
case *gcs.ReplicaClient:
|
||||
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, r.SyncInterval)
|
||||
case *abs.ReplicaClient:
|
||||
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Endpoint, r.SyncInterval)
|
||||
default:
|
||||
log.Printf("replicating to: name=%q type=%q", r.Name(), client.Type())
|
||||
}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -4,6 +4,8 @@ go 1.16
|
||||
|
||||
require (
|
||||
cloud.google.com/go/storage v1.15.0
|
||||
github.com/Azure/azure-storage-blob-go v0.13.0 // indirect
|
||||
github.com/Azure/go-autorest/autorest v0.9.0 // indirect
|
||||
github.com/aws/aws-sdk-go v1.27.0
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/mattn/go-sqlite3 v1.14.5
|
||||
|
||||
25
go.sum
25
go.sum
@@ -39,6 +39,22 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
|
||||
cloud.google.com/go/storage v1.15.0 h1:Ljj+ZXVEhCr/1+4ZhvtteN1ND7UUsNTlduGclLh8GO0=
|
||||
cloud.google.com/go/storage v1.15.0/go.mod h1:mjjQMoxxyGH7Jr8K5qrx6N2O0AHsczI61sMNn03GIZI=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc=
|
||||
github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs=
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.2/go.mod h1:/3SMAM86bP6wC9Ev35peQDUeqFZBMH07vvUOmg4z/fE=
|
||||
github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA=
|
||||
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
|
||||
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
|
||||
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
|
||||
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
|
||||
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
|
||||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
|
||||
@@ -187,6 +203,8 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe
|
||||
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
|
||||
@@ -251,6 +269,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
|
||||
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
||||
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
|
||||
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
@@ -279,6 +299,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
|
||||
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
|
||||
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
|
||||
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
|
||||
@@ -454,6 +475,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@@ -520,6 +542,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@@ -539,6 +562,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@@ -744,6 +768,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/litestream"
|
||||
"github.com/benbjohnson/litestream/abs"
|
||||
"github.com/benbjohnson/litestream/file"
|
||||
"github.com/benbjohnson/litestream/gcs"
|
||||
"github.com/benbjohnson/litestream/s3"
|
||||
@@ -42,12 +43,20 @@ var (
|
||||
s3SkipVerify = flag.Bool("s3-skip-verify", os.Getenv("LITESTREAM_S3_SKIP_VERIFY") == "true", "")
|
||||
)
|
||||
|
||||
// GCS settings
|
||||
// Google cloud storage settings
|
||||
var (
|
||||
gcsBucket = flag.String("gcs-bucket", os.Getenv("LITESTREAM_GCS_BUCKET"), "")
|
||||
gcsPath = flag.String("gcs-path", os.Getenv("LITESTREAM_GCS_PATH"), "")
|
||||
)
|
||||
|
||||
// Azure blob storage settings
|
||||
var (
|
||||
absAccountName = flag.String("abs-account-name", os.Getenv("LITESTREAM_ABS_ACCOUNT_NAME"), "")
|
||||
absAccountKey = flag.String("abs-account-key", os.Getenv("LITESTREAM_ABS_ACCOUNT_KEY"), "")
|
||||
absBucket = flag.String("abs-bucket", os.Getenv("LITESTREAM_ABS_BUCKET"), "")
|
||||
absPath = flag.String("abs-path", os.Getenv("LITESTREAM_ABS_PATH"), "")
|
||||
)
|
||||
|
||||
func TestReplicaClient_Generations(t *testing.T) {
|
||||
RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) {
|
||||
t.Parallel()
|
||||
@@ -463,6 +472,8 @@ func NewReplicaClient(tb testing.TB, typ string) litestream.ReplicaClient {
|
||||
return NewS3ReplicaClient(tb)
|
||||
case gcs.ReplicaClientType:
|
||||
return NewGCSReplicaClient(tb)
|
||||
case abs.ReplicaClientType:
|
||||
return NewABSReplicaClient(tb)
|
||||
default:
|
||||
tb.Fatalf("invalid replica client type: %q", typ)
|
||||
return nil
|
||||
@@ -501,6 +512,18 @@ func NewGCSReplicaClient(tb testing.TB) *gcs.ReplicaClient {
|
||||
return c
|
||||
}
|
||||
|
||||
// NewABSReplicaClient returns a new client for integration testing.
|
||||
func NewABSReplicaClient(tb testing.TB) *abs.ReplicaClient {
|
||||
tb.Helper()
|
||||
|
||||
c := abs.NewReplicaClient()
|
||||
c.AccountName = *absAccountName
|
||||
c.AccountKey = *absAccountKey
|
||||
c.Bucket = *absBucket
|
||||
c.Path = path.Join(*absPath, fmt.Sprintf("%016x", rand.Uint64()))
|
||||
return c
|
||||
}
|
||||
|
||||
// MustDeleteAll deletes all objects under the client's path.
|
||||
func MustDeleteAll(tb testing.TB, c litestream.ReplicaClient) {
|
||||
tb.Helper()
|
||||
|
||||
Reference in New Issue
Block a user