Add support for S3-compatible object storage.
This commits adds support for non-AWS S3-compatible storage such as MinIO, Backblaze B2, & Google Cloud Storage (GCS). Other backends should also work but some code has been added to make URL-based configurations work more easily.
This commit is contained in:
73
s3/s3.go
73
s3/s3.go
@@ -7,8 +7,10 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -69,9 +71,11 @@ type Replica struct {
|
||||
SecretAccessKey string
|
||||
|
||||
// S3 bucket information
|
||||
Region string
|
||||
Bucket string
|
||||
Path string
|
||||
Region string
|
||||
Bucket string
|
||||
Path string
|
||||
Endpoint string
|
||||
ForcePathStyle bool
|
||||
|
||||
// Time between syncs with the shadow WAL.
|
||||
SyncInterval time.Duration
|
||||
@@ -646,9 +650,11 @@ func (r *Replica) Init(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Look up region if not specified.
|
||||
// Look up region if not specified and no endpoint is used.
|
||||
// Endpoints are typically used for non-S3 object stores and do not
|
||||
// necessarily require a region.
|
||||
region := r.Region
|
||||
if region == "" {
|
||||
if region == "" && r.Endpoint == "" {
|
||||
if region, err = r.findBucketRegion(ctx, r.Bucket); err != nil {
|
||||
return fmt.Errorf("cannot lookup bucket region: %w", err)
|
||||
}
|
||||
@@ -656,7 +662,9 @@ func (r *Replica) Init(ctx context.Context) (err error) {
|
||||
|
||||
// Create new AWS session.
|
||||
config := r.config()
|
||||
config.Region = aws.String(region)
|
||||
if region != "" {
|
||||
config.Region = aws.String(region)
|
||||
}
|
||||
sess, err := session.NewSession(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create aws session: %w", err)
|
||||
@@ -673,6 +681,12 @@ func (r *Replica) config() *aws.Config {
|
||||
if r.AccessKeyID != "" || r.SecretAccessKey != "" {
|
||||
config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "")
|
||||
}
|
||||
if r.Endpoint != "" {
|
||||
config.Endpoint = aws.String(r.Endpoint)
|
||||
}
|
||||
if r.ForcePathStyle {
|
||||
config.S3ForcePathStyle = aws.Bool(r.ForcePathStyle)
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
@@ -1027,6 +1041,53 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
|
||||
return nil
|
||||
}
|
||||
|
||||
// ParseHost extracts data from a hostname depending on the service provider.
|
||||
func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool) {
|
||||
// Extract port if one is specified.
|
||||
host, port, err := net.SplitHostPort(s)
|
||||
if err != nil {
|
||||
host = s
|
||||
}
|
||||
|
||||
// Default to path-based URLs, except for with AWS S3 itself.
|
||||
forcePathStyle = true
|
||||
|
||||
// Extract fields from provider-specific host formats.
|
||||
scheme := "https"
|
||||
if a := localhostRegex.FindStringSubmatch(host); a != nil {
|
||||
bucket, region = a[1], "us-east-1"
|
||||
scheme, endpoint = "http", "localhost"
|
||||
} else if a := gcsRegex.FindStringSubmatch(host); a != nil {
|
||||
bucket, region = a[1], "us-east-1"
|
||||
endpoint = "storage.googleapis.com"
|
||||
} else if a := backblazeRegex.FindStringSubmatch(host); a != nil {
|
||||
bucket = a[1]
|
||||
region = a[2]
|
||||
endpoint = fmt.Sprintf("s3.%s.backblazeb2.com", a[2])
|
||||
} else {
|
||||
bucket = host
|
||||
forcePathStyle = false
|
||||
}
|
||||
|
||||
// Add port back to endpoint, if available.
|
||||
if endpoint != "" && port != "" {
|
||||
endpoint = net.JoinHostPort(endpoint, port)
|
||||
}
|
||||
|
||||
// Prepend scheme to endpoint.
|
||||
if endpoint != "" {
|
||||
endpoint = scheme + "://" + endpoint
|
||||
}
|
||||
|
||||
return bucket, region, endpoint, forcePathStyle
|
||||
}
|
||||
|
||||
var (
|
||||
localhostRegex = regexp.MustCompile(`^(?:(.+)\.)?localhost$`)
|
||||
backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`)
|
||||
gcsRegex = regexp.MustCompile(`^(?:(.+)\.)?storage.googleapis.com$`)
|
||||
)
|
||||
|
||||
// S3 metrics.
|
||||
var (
|
||||
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
|
||||
80
s3/s3_test.go
Normal file
80
s3/s3_test.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package s3_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/benbjohnson/litestream/s3"
|
||||
)
|
||||
|
||||
func TestParseHost(t *testing.T) {
|
||||
// Ensure non-specific hosts return as buckets.
|
||||
t.Run("S3", func(t *testing.T) {
|
||||
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.litestream.io`)
|
||||
if got, want := bucket, `test.litestream.io`; got != want {
|
||||
t.Fatalf("bucket=%q, want %q", got, want)
|
||||
} else if got, want := region, ``; got != want {
|
||||
t.Fatalf("region=%q, want %q", got, want)
|
||||
} else if got, want := endpoint, ``; got != want {
|
||||
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||
} else if got, want := forcePathStyle, false; got != want {
|
||||
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
// Ensure localhosts use an HTTP endpoint and extract the bucket name.
|
||||
t.Run("Localhost", func(t *testing.T) {
|
||||
t.Run("WithPort", func(t *testing.T) {
|
||||
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost:9000`)
|
||||
if got, want := bucket, `test`; got != want {
|
||||
t.Fatalf("bucket=%q, want %q", got, want)
|
||||
} else if got, want := region, `us-east-1`; got != want {
|
||||
t.Fatalf("region=%q, want %q", got, want)
|
||||
} else if got, want := endpoint, `http://localhost:9000`; got != want {
|
||||
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||
} else if got, want := forcePathStyle, true; got != want {
|
||||
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("WithoutPort", func(t *testing.T) {
|
||||
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost`)
|
||||
if got, want := bucket, `test`; got != want {
|
||||
t.Fatalf("bucket=%q, want %q", got, want)
|
||||
} else if got, want := region, `us-east-1`; got != want {
|
||||
t.Fatalf("region=%q, want %q", got, want)
|
||||
} else if got, want := endpoint, `http://localhost`; got != want {
|
||||
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||
} else if got, want := forcePathStyle, true; got != want {
|
||||
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Ensure backblaze B2 URLs extract bucket, region, & endpoint from host.
|
||||
t.Run("Backblaze", func(t *testing.T) {
|
||||
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test-123.s3.us-west-000.backblazeb2.com`)
|
||||
if got, want := bucket, `test-123`; got != want {
|
||||
t.Fatalf("bucket=%q, want %q", got, want)
|
||||
} else if got, want := region, `us-west-000`; got != want {
|
||||
t.Fatalf("region=%q, want %q", got, want)
|
||||
} else if got, want := endpoint, `https://s3.us-west-000.backblazeb2.com`; got != want {
|
||||
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||
} else if got, want := forcePathStyle, true; got != want {
|
||||
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
// Ensure GCS URLs extract bucket & endpoint from host.
|
||||
t.Run("GCS", func(t *testing.T) {
|
||||
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`litestream.io.storage.googleapis.com`)
|
||||
if got, want := bucket, `litestream.io`; got != want {
|
||||
t.Fatalf("bucket=%q, want %q", got, want)
|
||||
} else if got, want := region, `us-east-1`; got != want {
|
||||
t.Fatalf("region=%q, want %q", got, want)
|
||||
} else if got, want := endpoint, `https://storage.googleapis.com`; got != want {
|
||||
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||
} else if got, want := forcePathStyle, true; got != want {
|
||||
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user