From 84dc68c09cfeabef264b39a06830edddcf1a8bfc Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 23 May 2021 07:56:32 -0600 Subject: [PATCH] Add Azure Blob Storage replica type --- .github/workflows/test.yml | 11 +- abs/replica_client.go | 570 ++++++++++++++++++++++++++++++++++++ cmd/litestream/main.go | 51 ++++ cmd/litestream/replicate.go | 3 + go.mod | 2 + go.sum | 25 ++ replica_client_test.go | 25 +- 7 files changed, 684 insertions(+), 3 deletions(-) create mode 100644 abs/replica_client.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 82dc031..f7fff4a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,7 +26,7 @@ jobs: - name: Run unit tests run: go test -v ./... - - name: Run s3 integration tests + - name: Run aws s3 tests run: go test -v -run=TestReplicaClient . -integration s3 env: LITESTREAM_S3_ACCESS_KEY_ID: ${{ secrets.LITESTREAM_S3_ACCESS_KEY_ID }} @@ -34,8 +34,15 @@ jobs: LITESTREAM_S3_REGION: ${{ secrets.LITESTREAM_S3_REGION }} LITESTREAM_S3_BUCKET: ${{ secrets.LITESTREAM_S3_BUCKET }} - - name: Run gcs integration tests + - name: Run google cloud storage (gcs) tests run: go test -v -run=TestReplicaClient . -integration gcs env: GOOGLE_APPLICATION_CREDENTIALS: /opt/gcp.json LITESTREAM_GCS_BUCKET: ${{ secrets.LITESTREAM_GCS_BUCKET }} + + - name: Run azure blob storage (abs) tests + run: go test -v -run=TestReplicaClient . -integration abs + env: + LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }} + LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }} + LITESTREAM_ABS_BUCKET: ${{ secrets.LITESTREAM_ABS_BUCKET }} diff --git a/abs/replica_client.go b/abs/replica_client.go new file mode 100644 index 0000000..91115e2 --- /dev/null +++ b/abs/replica_client.go @@ -0,0 +1,570 @@ +package abs + +import ( + "context" + "fmt" + "io" + "net/url" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/internal" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" +) + +// ReplicaClientType is the client type for this package. +const ReplicaClientType = "abs" + +var _ litestream.ReplicaClient = (*ReplicaClient)(nil) + +// ReplicaClient is a client for writing snapshots & WAL segments to disk. +type ReplicaClient struct { + mu sync.Mutex + containerURL *azblob.ContainerURL + + // Azure credentials + AccountName string + AccountKey string + Endpoint string + + // Azure Blob Storage container information + Bucket string + Path string +} + +// NewReplicaClient returns a new instance of ReplicaClient. +func NewReplicaClient() *ReplicaClient { + return &ReplicaClient{} +} + +// Type returns "abs" as the client type. +func (c *ReplicaClient) Type() string { + return ReplicaClientType +} + +// Init initializes the connection to Azure. No-op if already initialized. +func (c *ReplicaClient) Init(ctx context.Context) (err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.containerURL != nil { + return nil + } + + // Authenticate to ACS. + credential, err := azblob.NewSharedKeyCredential(c.AccountName, c.AccountKey) + if err != nil { + return err + } + + // Construct & parse endpoint unless already set. + endpoint := c.Endpoint + if endpoint == "" { + endpoint = fmt.Sprintf("https://%s.blob.core.windows.net", c.AccountName) + } + endpointURL, err := url.Parse(endpoint) + if err != nil { + return fmt.Errorf("cannot parse azure endpoint: %w", err) + } + + // Build pipeline and reference to container. + pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + containerURL := azblob.NewServiceURL(*endpointURL, pipeline).NewContainerURL(c.Bucket) + c.containerURL = &containerURL + + return nil +} + +// Generations returns a list of available generation names. +func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + + var generations []string + var marker azblob.Marker + for marker.NotDone() { + operationTotalCounterVec.WithLabelValues("LIST").Inc() + + resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{ + Prefix: litestream.GenerationsPath(c.Path) + "/", + }) + if err != nil { + return nil, err + } + marker = resp.NextMarker + + for _, prefix := range resp.Segment.BlobPrefixes { + name := path.Base(strings.TrimSuffix(prefix.Name, "/")) + if !litestream.IsGenerationName(name) { + continue + } + generations = append(generations, name) + } + } + + return generations, nil +} + +// DeleteGeneration deletes all snapshots & WAL segments within a generation. +func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error { + if err := c.Init(ctx); err != nil { + return err + } + + dir, err := litestream.GenerationPath(c.Path, generation) + if err != nil { + return fmt.Errorf("cannot determine generation path: %w", err) + } + + var marker azblob.Marker + for marker.NotDone() { + operationTotalCounterVec.WithLabelValues("LIST").Inc() + + resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) + if err != nil { + return err + } + marker = resp.NextMarker + + for _, item := range resp.Segment.BlobItems { + operationTotalCounterVec.WithLabelValues("DELETE").Inc() + + blobURL := c.containerURL.NewBlobURL(item.Name) + if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) { + continue + } else if err != nil { + return err + } + } + } + + // log.Printf("%s(%s): retainer: deleting generation: %s", r.db.Path(), r.Name(), generation) + + return nil +} + +// Snapshots returns an iterator over all available snapshots for a generation. +func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (litestream.SnapshotIterator, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + return newSnapshotIterator(ctx, generation, c), nil +} + +// WriteSnapshot writes LZ4 compressed data from rd to the object storage. +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { + if err := c.Init(ctx); err != nil { + return info, err + } + + key, err := litestream.SnapshotPath(c.Path, generation, index) + if err != nil { + return info, fmt.Errorf("cannot determine snapshot path: %w", err) + } + startTime := time.Now() + + rc := internal.NewReadCounter(rd) + + blobURL := c.containerURL.NewBlockBlobURL(key) + if _, err := azblob.UploadStreamToBlockBlob(ctx, rc, blobURL, azblob.UploadStreamToBlockBlobOptions{ + BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: "application/octet-stream"}, + BlobAccessTier: azblob.DefaultAccessTier, + }); err != nil { + return info, err + } + + operationTotalCounterVec.WithLabelValues("PUT").Inc() + operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N())) + + // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) + + return litestream.SnapshotInfo{ + Generation: generation, + Index: index, + Size: rc.N(), + CreatedAt: startTime.UTC(), + }, nil +} + +// SnapshotReader returns a reader for snapshot data at the given generation/index. +func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + + key, err := litestream.SnapshotPath(c.Path, generation, index) + if err != nil { + return nil, fmt.Errorf("cannot determine snapshot path: %w", err) + } + + blobURL := c.containerURL.NewBlobURL(key) + resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if isNotExists(err) { + return nil, os.ErrNotExist + } else if err != nil { + return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err) + } + + operationTotalCounterVec.WithLabelValues("GET").Inc() + operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength())) + + return resp.Body(azblob.RetryReaderOptions{}), nil +} + +// DeleteSnapshot deletes a snapshot with the given generation & index. +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { + if err := c.Init(ctx); err != nil { + return err + } + + key, err := litestream.SnapshotPath(c.Path, generation, index) + if err != nil { + return fmt.Errorf("cannot determine snapshot path: %w", err) + } + + operationTotalCounterVec.WithLabelValues("DELETE").Inc() + + blobURL := c.containerURL.NewBlobURL(key) + if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) { + return nil + } else if err != nil { + return fmt.Errorf("cannot delete snapshot %q: %w", key, err) + } + return nil +} + +// WALSegments returns an iterator over all available WAL files for a generation. +func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + return newWALSegmentIterator(ctx, generation, c), nil +} + +// WriteWALSegment writes LZ4 compressed data from rd into a file on disk. +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { + if err := c.Init(ctx); err != nil { + return info, err + } + + key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + if err != nil { + return info, fmt.Errorf("cannot determine wal segment path: %w", err) + } + startTime := time.Now() + + rc := internal.NewReadCounter(rd) + + blobURL := c.containerURL.NewBlockBlobURL(key) + if _, err := azblob.UploadStreamToBlockBlob(ctx, rc, blobURL, azblob.UploadStreamToBlockBlobOptions{ + BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: "application/octet-stream"}, + BlobAccessTier: azblob.DefaultAccessTier, + }); err != nil { + return info, err + } + + operationTotalCounterVec.WithLabelValues("PUT").Inc() + operationBytesCounterVec.WithLabelValues("PUT").Add(float64(rc.N())) + + return litestream.WALSegmentInfo{ + Generation: pos.Generation, + Index: pos.Index, + Offset: pos.Offset, + Size: rc.N(), + CreatedAt: startTime.UTC(), + }, nil +} + +// WALSegmentReader returns a reader for a section of WAL data at the given index. +// Returns os.ErrNotExist if no matching index/offset is found. +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + + key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + if err != nil { + return nil, fmt.Errorf("cannot determine wal segment path: %w", err) + } + + blobURL := c.containerURL.NewBlobURL(key) + resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if isNotExists(err) { + return nil, os.ErrNotExist + } else if err != nil { + return nil, fmt.Errorf("cannot start new reader for %q: %w", key, err) + } + + operationTotalCounterVec.WithLabelValues("GET").Inc() + operationBytesCounterVec.WithLabelValues("GET").Add(float64(resp.ContentLength())) + + return resp.Body(azblob.RetryReaderOptions{}), nil +} + +// DeleteWALSegments deletes WAL segments with at the given positions. +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error { + if err := c.Init(ctx); err != nil { + return err + } + + for _, pos := range a { + key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + if err != nil { + return fmt.Errorf("cannot determine wal segment path: %w", err) + } + + operationTotalCounterVec.WithLabelValues("DELETE").Inc() + + blobURL := c.containerURL.NewBlobURL(key) + if _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); isNotExists(err) { + continue + } else if err != nil { + return fmt.Errorf("cannot delete wal segment %q: %w", key, err) + } + } + + return nil +} + +type snapshotIterator struct { + client *ReplicaClient + generation string + + ch chan litestream.SnapshotInfo + g errgroup.Group + ctx context.Context + cancel func() + + info litestream.SnapshotInfo + err error +} + +func newSnapshotIterator(ctx context.Context, generation string, client *ReplicaClient) *snapshotIterator { + itr := &snapshotIterator{ + client: client, + generation: generation, + ch: make(chan litestream.SnapshotInfo), + } + + itr.ctx, itr.cancel = context.WithCancel(ctx) + itr.g.Go(itr.fetch) + + return itr +} + +// fetch runs in a separate goroutine to fetch pages of objects and stream them to a channel. +func (itr *snapshotIterator) fetch() error { + defer close(itr.ch) + + dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation) + if err != nil { + return fmt.Errorf("cannot determine snapshots path: %w", err) + } + + var marker azblob.Marker + for marker.NotDone() { + operationTotalCounterVec.WithLabelValues("LIST").Inc() + + resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) + if err != nil { + return err + } + marker = resp.NextMarker + + for _, item := range resp.Segment.BlobItems { + key := path.Base(item.Name) + index, err := litestream.ParseSnapshotPath(key) + if err != nil { + continue + } + + info := litestream.SnapshotInfo{ + Generation: itr.generation, + Index: index, + Size: *item.Properties.ContentLength, + CreatedAt: item.Properties.CreationTime.UTC(), + } + + select { + case <-itr.ctx.Done(): + case itr.ch <- info: + } + } + } + return nil +} + +func (itr *snapshotIterator) Close() (err error) { + err = itr.err + + // Cancel context and wait for error group to finish. + itr.cancel() + if e := itr.g.Wait(); e != nil && err == nil { + err = e + } + + return err +} + +func (itr *snapshotIterator) Next() bool { + // Exit if an error has already occurred. + if itr.err != nil { + return false + } + + // Return false if context was canceled or if there are no more snapshots. + // Otherwise fetch the next snapshot and store it on the iterator. + select { + case <-itr.ctx.Done(): + return false + case info, ok := <-itr.ch: + if !ok { + return false + } + itr.info = info + return true + } +} + +func (itr *snapshotIterator) Err() error { return itr.err } + +func (itr *snapshotIterator) Snapshot() litestream.SnapshotInfo { + return itr.info +} + +type walSegmentIterator struct { + client *ReplicaClient + generation string + + ch chan litestream.WALSegmentInfo + g errgroup.Group + ctx context.Context + cancel func() + + info litestream.WALSegmentInfo + err error +} + +func newWALSegmentIterator(ctx context.Context, generation string, client *ReplicaClient) *walSegmentIterator { + itr := &walSegmentIterator{ + client: client, + generation: generation, + ch: make(chan litestream.WALSegmentInfo), + } + + itr.ctx, itr.cancel = context.WithCancel(ctx) + itr.g.Go(itr.fetch) + + return itr +} + +// fetch runs in a separate goroutine to fetch pages of objects and stream them to a channel. +func (itr *walSegmentIterator) fetch() error { + defer close(itr.ch) + + dir, err := litestream.WALPath(itr.client.Path, itr.generation) + if err != nil { + return fmt.Errorf("cannot determine wal path: %w", err) + } + + var marker azblob.Marker + for marker.NotDone() { + operationTotalCounterVec.WithLabelValues("LIST").Inc() + + resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) + if err != nil { + return err + } + marker = resp.NextMarker + + for _, item := range resp.Segment.BlobItems { + key := path.Base(item.Name) + index, offset, err := litestream.ParseWALSegmentPath(key) + if err != nil { + continue + } + + info := litestream.WALSegmentInfo{ + Generation: itr.generation, + Index: index, + Offset: offset, + Size: *item.Properties.ContentLength, + CreatedAt: item.Properties.CreationTime.UTC(), + } + + select { + case <-itr.ctx.Done(): + case itr.ch <- info: + } + } + } + return nil +} + +func (itr *walSegmentIterator) Close() (err error) { + err = itr.err + + // Cancel context and wait for error group to finish. + itr.cancel() + if e := itr.g.Wait(); e != nil && err == nil { + err = e + } + + return err +} + +func (itr *walSegmentIterator) Next() bool { + // Exit if an error has already occurred. + if itr.err != nil { + return false + } + + // Return false if context was canceled or if there are no more segments. + // Otherwise fetch the next segment and store it on the iterator. + select { + case <-itr.ctx.Done(): + return false + case info, ok := <-itr.ch: + if !ok { + return false + } + itr.info = info + return true + } +} + +func (itr *walSegmentIterator) Err() error { return itr.err } + +func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo { + return itr.info +} + +func isNotExists(err error) bool { + switch err := err.(type) { + case azblob.StorageError: + return err.ServiceCode() == azblob.ServiceCodeBlobNotFound + default: + return false + } +} + +// Azure blob storage metrics. +var ( + operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "litestream_abs_operation_total", + Help: "The number of ABS operations performed", + }, []string{"type"}) + + operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "litestream_abs_operation_bytes", + Help: "The number of bytes used by ABS operations", + }, []string{"type"}) +) diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index fd3be22..31436c9 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -19,6 +19,7 @@ import ( "time" "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/abs" "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/s3" @@ -292,6 +293,10 @@ type ReplicaConfig struct { Endpoint string `yaml:"endpoint"` ForcePathStyle *bool `yaml:"force-path-style"` SkipVerify bool `yaml:"skip-verify"` + + // ABS settings + AccountName string `yaml:"account-name"` + AccountKey string `yaml:"account-key"` } // NewReplicaFromConfig instantiates a replica for a DB based on a config. @@ -335,6 +340,10 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re if r.Client, err = newGCSReplicaClientFromConfig(c, r); err != nil { return nil, err } + case "abs": + if r.Client, err = newABSReplicaClientFromConfig(c, r); err != nil { + return nil, err + } default: return nil, fmt.Errorf("unknown replica type in config: %q", c.Type) } @@ -475,6 +484,48 @@ func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ * return client, nil } +// newABSReplicaClientFromConfig returns a new instance of abs.ReplicaClient built from config. +func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *abs.ReplicaClient, err error) { + // Ensure URL & constituent parts are not both specified. + if c.URL != "" && c.Path != "" { + return nil, fmt.Errorf("cannot specify url & path for abs replica") + } else if c.URL != "" && c.Bucket != "" { + return nil, fmt.Errorf("cannot specify url & bucket for abs replica") + } + + bucket, path := c.Bucket, c.Path + + // Apply settings from URL, if specified. + if c.URL != "" { + _, uhost, upath, err := ParseReplicaURL(c.URL) + if err != nil { + return nil, err + } + + // Only apply URL parts to field that have not been overridden. + if path == "" { + path = upath + } + if bucket == "" { + bucket = uhost + } + } + + // Ensure required settings are set. + if bucket == "" { + return nil, fmt.Errorf("bucket required for abs replica") + } + + // Build replica. + client := abs.NewReplicaClient() + client.AccountName = c.AccountName + client.AccountKey = c.AccountKey + client.Bucket = bucket + client.Path = path + client.Endpoint = c.Endpoint + return client, nil +} + // applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to // their AWS counterparts as the "AWS" prefix can be confusing when using a // non-AWS S3-compatible service. diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index c3b9ca8..478f585 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -11,6 +11,7 @@ import ( "os" "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/abs" "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/s3" @@ -111,6 +112,8 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Region, client.Endpoint, r.SyncInterval) case *gcs.ReplicaClient: log.Printf("replicating to: name=%q type=%q bucket=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, r.SyncInterval) + case *abs.ReplicaClient: + log.Printf("replicating to: name=%q type=%q bucket=%q path=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Endpoint, r.SyncInterval) default: log.Printf("replicating to: name=%q type=%q", r.Name(), client.Type()) } diff --git a/go.mod b/go.mod index 29c49da..d81acc2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.16 require ( cloud.google.com/go/storage v1.15.0 + github.com/Azure/azure-storage-blob-go v0.13.0 // indirect + github.com/Azure/go-autorest/autorest v0.9.0 // indirect github.com/aws/aws-sdk-go v1.27.0 github.com/davecgh/go-spew v1.1.1 github.com/mattn/go-sqlite3 v1.14.5 diff --git a/go.sum b/go.sum index a46ee29..8e762c6 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,22 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.15.0 h1:Ljj+ZXVEhCr/1+4ZhvtteN1ND7UUsNTlduGclLh8GO0= cloud.google.com/go/storage v1.15.0/go.mod h1:mjjQMoxxyGH7Jr8K5qrx6N2O0AHsczI61sMNn03GIZI= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= +github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= +github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc= +github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs= +github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= +github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= +github.com/Azure/go-autorest/autorest/adal v0.9.2/go.mod h1:/3SMAM86bP6wC9Ev35peQDUeqFZBMH07vvUOmg4z/fE= +github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA= +github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= +github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= +github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= +github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= +github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= +github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= +github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= @@ -187,6 +203,8 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= @@ -251,6 +269,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI= +github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= @@ -279,6 +299,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -454,6 +475,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -520,6 +542,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -539,6 +562,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -744,6 +768,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/replica_client_test.go b/replica_client_test.go index 98213cf..07d68bf 100644 --- a/replica_client_test.go +++ b/replica_client_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/abs" "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/s3" @@ -42,12 +43,20 @@ var ( s3SkipVerify = flag.Bool("s3-skip-verify", os.Getenv("LITESTREAM_S3_SKIP_VERIFY") == "true", "") ) -// GCS settings +// Google cloud storage settings var ( gcsBucket = flag.String("gcs-bucket", os.Getenv("LITESTREAM_GCS_BUCKET"), "") gcsPath = flag.String("gcs-path", os.Getenv("LITESTREAM_GCS_PATH"), "") ) +// Azure blob storage settings +var ( + absAccountName = flag.String("abs-account-name", os.Getenv("LITESTREAM_ABS_ACCOUNT_NAME"), "") + absAccountKey = flag.String("abs-account-key", os.Getenv("LITESTREAM_ABS_ACCOUNT_KEY"), "") + absBucket = flag.String("abs-bucket", os.Getenv("LITESTREAM_ABS_BUCKET"), "") + absPath = flag.String("abs-path", os.Getenv("LITESTREAM_ABS_PATH"), "") +) + func TestReplicaClient_Generations(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() @@ -463,6 +472,8 @@ func NewReplicaClient(tb testing.TB, typ string) litestream.ReplicaClient { return NewS3ReplicaClient(tb) case gcs.ReplicaClientType: return NewGCSReplicaClient(tb) + case abs.ReplicaClientType: + return NewABSReplicaClient(tb) default: tb.Fatalf("invalid replica client type: %q", typ) return nil @@ -501,6 +512,18 @@ func NewGCSReplicaClient(tb testing.TB) *gcs.ReplicaClient { return c } +// NewABSReplicaClient returns a new client for integration testing. +func NewABSReplicaClient(tb testing.TB) *abs.ReplicaClient { + tb.Helper() + + c := abs.NewReplicaClient() + c.AccountName = *absAccountName + c.AccountKey = *absAccountKey + c.Bucket = *absBucket + c.Path = path.Join(*absPath, fmt.Sprintf("%016x", rand.Uint64())) + return c +} + // MustDeleteAll deletes all objects under the client's path. func MustDeleteAll(tb testing.TB, c litestream.ReplicaClient) { tb.Helper()