From 6c865e37f1ce00bc2022dd8bfc61c0023900069a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 21 May 2021 10:37:34 -0600 Subject: [PATCH] Move path functions to litestream package --- file/replica_client.go | 16 ++-- litestream.go | 51 +++++++++++ litestream_test.go | 88 +++++++++++++++++++ s3/replica_client.go | 126 ++++++++++++++------------- s3/replica_client_test.go | 173 ++++++++++++++++---------------------- s3/s3.go | 61 -------------- s3/s3_test.go | 80 ------------------ 7 files changed, 285 insertions(+), 310 deletions(-) delete mode 100644 s3/s3.go delete mode 100644 s3/s3_test.go diff --git a/file/replica_client.go b/file/replica_client.go index 4254c46..12cbe0b 100644 --- a/file/replica_client.go +++ b/file/replica_client.go @@ -30,7 +30,7 @@ func NewReplicaClient(path string) *ReplicaClient { } // db returns the database, if available. -func (c *ReplicaClient) db() *litestream.DB{ +func (c *ReplicaClient) db() *litestream.DB { if c.Replica == nil { return nil } @@ -191,10 +191,9 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in } var fileInfo, dirInfo os.FileInfo - if db := c.db(); db != nil { - fileInfo, dirInfo = db.FileInfo(), db.DirInfo() - } - + if db := c.db(); db != nil { + fileInfo, dirInfo = db.FileInfo(), db.DirInfo() + } // Ensure parent directory exists. if err := internal.MkdirAll(filepath.Dir(filename), dirInfo); err != nil { @@ -309,9 +308,9 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, } var fileInfo, dirInfo os.FileInfo - if db := c.db(); db != nil { - fileInfo, dirInfo = db.FileInfo(), db.DirInfo() - } + if db := c.db(); db != nil { + fileInfo, dirInfo = db.FileInfo(), db.DirInfo() + } // Ensure parent directory exists. if err := internal.MkdirAll(filepath.Dir(filename), dirInfo); err != nil { @@ -377,4 +376,3 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } return nil } - diff --git a/litestream.go b/litestream.go index 8e39cbc..f31985b 100644 --- a/litestream.go +++ b/litestream.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "regexp" "strconv" @@ -383,6 +384,56 @@ func IsGenerationName(s string) bool { return true } +// GenerationsPath returns the path to a generation root directory. +func GenerationsPath(root string) string { + return path.Join(root, "generations") +} + +// GenerationPath returns the path to a generation's root directory. +func GenerationPath(root, generation string) (string, error) { + dir := GenerationsPath(root) + if generation == "" { + return "", fmt.Errorf("generation required") + } + return path.Join(dir, generation), nil +} + +// SnapshotsPath returns the path to a generation's snapshot directory. +func SnapshotsPath(root, generation string) (string, error) { + dir, err := GenerationPath(root, generation) + if err != nil { + return "", err + } + return path.Join(dir, "snapshots"), nil +} + +// SnapshotPath returns the path to an uncompressed snapshot file. +func SnapshotPath(root, generation string, index int) (string, error) { + dir, err := SnapshotsPath(root, generation) + if err != nil { + return "", err + } + return path.Join(dir, FormatSnapshotPath(index)), nil +} + +// WALPath returns the path to a generation's WAL directory +func WALPath(root, generation string) (string, error) { + dir, err := GenerationPath(root, generation) + if err != nil { + return "", err + } + return path.Join(dir, "wal"), nil +} + +// WALSegmentPath returns the path to a WAL segment file. +func WALSegmentPath(root, generation string, index int, offset int64) (string, error) { + dir, err := WALPath(root, generation) + if err != nil { + return "", err + } + return path.Join(dir, FormatWALSegmentPath(index, offset)), nil +} + // IsSnapshotPath returns true if s is a path to a snapshot file. func IsSnapshotPath(s string) bool { return snapshotPathRegex.MatchString(s) diff --git a/litestream_test.go b/litestream_test.go index a03a748..0f1bb85 100644 --- a/litestream_test.go +++ b/litestream_test.go @@ -40,6 +40,94 @@ func TestChecksum(t *testing.T) { }) } +func TestGenerationsPath(t *testing.T) { + t.Run("OK", func(t *testing.T) { + if got, want := litestream.GenerationsPath("foo"), "foo/generations"; got != want { + t.Fatalf("GenerationsPath()=%v, want %v", got, want) + } + }) + t.Run("NoPath", func(t *testing.T) { + if got, want := litestream.GenerationsPath(""), "generations"; got != want { + t.Fatalf("GenerationsPath()=%v, want %v", got, want) + } + }) +} + +func TestGenerationPath(t *testing.T) { + t.Run("OK", func(t *testing.T) { + if got, err := litestream.GenerationPath("foo", "0123456701234567"); err != nil { + t.Fatal(err) + } else if want := "foo/generations/0123456701234567"; got != want { + t.Fatalf("GenerationPath()=%v, want %v", got, want) + } + }) + t.Run("ErrNoGeneration", func(t *testing.T) { + if _, err := litestream.GenerationPath("foo", ""); err == nil || err.Error() != `generation required` { + t.Fatalf("expected error: %v", err) + } + }) +} + +func TestSnapshotsPath(t *testing.T) { + t.Run("OK", func(t *testing.T) { + if got, err := litestream.SnapshotsPath("foo", "0123456701234567"); err != nil { + t.Fatal(err) + } else if want := "foo/generations/0123456701234567/snapshots"; got != want { + t.Fatalf("SnapshotsPath()=%v, want %v", got, want) + } + }) + t.Run("ErrNoGeneration", func(t *testing.T) { + if _, err := litestream.SnapshotsPath("foo", ""); err == nil || err.Error() != `generation required` { + t.Fatalf("unexpected error: %v", err) + } + }) +} + +func TestSnapshotPath(t *testing.T) { + t.Run("OK", func(t *testing.T) { + if got, err := litestream.SnapshotPath("foo", "0123456701234567", 1000); err != nil { + t.Fatal(err) + } else if want := "foo/generations/0123456701234567/snapshots/000003e8.snapshot.lz4"; got != want { + t.Fatalf("SnapshotPath()=%v, want %v", got, want) + } + }) + t.Run("ErrNoGeneration", func(t *testing.T) { + if _, err := litestream.SnapshotPath("foo", "", 1000); err == nil || err.Error() != `generation required` { + t.Fatalf("unexpected error: %v", err) + } + }) +} + +func TestWALPath(t *testing.T) { + t.Run("OK", func(t *testing.T) { + if got, err := litestream.WALPath("foo", "0123456701234567"); err != nil { + t.Fatal(err) + } else if want := "foo/generations/0123456701234567/wal"; got != want { + t.Fatalf("WALPath()=%v, want %v", got, want) + } + }) + t.Run("ErrNoGeneration", func(t *testing.T) { + if _, err := litestream.WALPath("foo", ""); err == nil || err.Error() != `generation required` { + t.Fatalf("unexpected error: %v", err) + } + }) +} + +func TestWALSegmentPath(t *testing.T) { + t.Run("OK", func(t *testing.T) { + if got, err := litestream.WALSegmentPath("foo", "0123456701234567", 1000, 1001); err != nil { + t.Fatal(err) + } else if want := "foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want { + t.Fatalf("WALPath()=%v, want %v", got, want) + } + }) + t.Run("ErrNoGeneration", func(t *testing.T) { + if _, err := litestream.WALSegmentPath("foo", "", 1000, 0); err == nil || err.Error() != `generation required` { + t.Fatalf("unexpected error: %v", err) + } + }) +} + func MustDecodeHexString(s string) []byte { b, err := hex.DecodeString(s) if err != nil { diff --git a/s3/replica_client.go b/s3/replica_client.go index 1d8f486..1bc16f5 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -5,9 +5,11 @@ import ( "crypto/tls" "fmt" "io" + "net" "net/http" "os" "path" + "regexp" "sync" "time" @@ -62,56 +64,6 @@ func (c *ReplicaClient) Type() string { return "s3" } -// GenerationsDir returns the path to a generation root directory. -func (c *ReplicaClient) GenerationsDir() string { - return path.Join(c.Path, "generations") -} - -// GenerationDir returns the path to a generation's root directory. -func (c *ReplicaClient) GenerationDir(generation string) (string, error) { - dir := c.GenerationsDir() - if generation == "" { - return "", fmt.Errorf("generation required") - } - return path.Join(dir, generation), nil -} - -// SnapshotsDir returns the path to a generation's snapshot directory. -func (c *ReplicaClient) SnapshotsDir(generation string) (string, error) { - dir, err := c.GenerationDir(generation) - if err != nil { - return "", err - } - return path.Join(dir, "snapshots"), nil -} - -// SnapshotPath returns the path to an uncompressed snapshot file. -func (c *ReplicaClient) SnapshotPath(generation string, index int) (string, error) { - dir, err := c.SnapshotsDir(generation) - if err != nil { - return "", err - } - return path.Join(dir, litestream.FormatSnapshotPath(index)), nil -} - -// WALDir returns the path to a generation's WAL directory -func (c *ReplicaClient) WALDir(generation string) (string, error) { - dir, err := c.GenerationDir(generation) - if err != nil { - return "", err - } - return path.Join(dir, "wal"), nil -} - -// WALSegmentPath returns the path to a WAL segment file. -func (c *ReplicaClient) WALSegmentPath(generation string, index int, offset int64) (string, error) { - dir, err := c.WALDir(generation) - if err != nil { - return "", err - } - return path.Join(dir, litestream.FormatWALSegmentPath(index, offset)), nil -} - // Init initializes the connection to S3. No-op if already initialized. func (c *ReplicaClient) Init(ctx context.Context) (err error) { c.mu.Lock() @@ -201,7 +153,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { var generations []string if err := c.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(c.Bucket), - Prefix: aws.String(c.GenerationsDir() + "/"), + Prefix: aws.String(litestream.GenerationsPath(c.Path) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { operationTotalCounterVec.WithLabelValues("LIST").Inc() @@ -227,7 +179,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) return err } - dir, err := c.GenerationDir(generation) + dir, err := litestream.GenerationPath(c.Path, generation) if err != nil { return fmt.Errorf("cannot determine generation directory path: %w", err) } @@ -285,7 +237,7 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in return info, err } - key, err := c.SnapshotPath(generation, index) + key, err := litestream.SnapshotPath(c.Path, generation, index) if err != nil { return info, fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -319,7 +271,7 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i return nil, err } - key, err := c.SnapshotPath(generation, index) + key, err := litestream.SnapshotPath(c.Path, generation, index) if err != nil { return nil, fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -345,7 +297,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i return err } - key, err := c.SnapshotPath(generation, index) + key, err := litestream.SnapshotPath(c.Path, generation, index) if err != nil { return fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -375,7 +327,7 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, return info, err } - key, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) if err != nil { return info, fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -409,7 +361,7 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos return nil, err } - key, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) if err != nil { return nil, fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -444,7 +396,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po // Generate a batch of object IDs for deleting the WAL segments. for i, pos := range a[:n] { - key, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) if err != nil { return fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -545,7 +497,7 @@ func newSnapshotIterator(ctx context.Context, client *ReplicaClient, generation func (itr *snapshotIterator) fetch() error { defer close(itr.ch) - dir, err := itr.client.SnapshotsDir(itr.generation) + dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation) if err != nil { return fmt.Errorf("cannot determine snapshot directory path: %w", err) } @@ -648,7 +600,7 @@ func newWALSegmentIterator(ctx context.Context, client *ReplicaClient, generatio func (itr *walSegmentIterator) fetch() error { defer close(itr.ch) - dir, err := itr.client.WALDir(itr.generation) + dir, err := litestream.WALPath(itr.client.Path, itr.generation) if err != nil { return fmt.Errorf("cannot determine wal directory path: %w", err) } @@ -723,6 +675,60 @@ func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo { return itr.info } +// ParseHost extracts data from a hostname depending on the service provider. +func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool) { + // Extract port if one is specified. + host, port, err := net.SplitHostPort(s) + if err != nil { + host = s + } + + // Default to path-based URLs, except for with AWS S3 itself. + forcePathStyle = true + + // Extract fields from provider-specific host formats. + scheme := "https" + if a := localhostRegex.FindStringSubmatch(host); a != nil { + bucket, region = a[1], "us-east-1" + scheme, endpoint = "http", "localhost" + } else if a := gcsRegex.FindStringSubmatch(host); a != nil { + bucket, region = a[1], "us-east-1" + endpoint = "storage.googleapis.com" + } else if a := digitalOceanRegex.FindStringSubmatch(host); a != nil { + bucket, region = a[1], a[2] + endpoint = fmt.Sprintf("%s.digitaloceanspaces.com", region) + } else if a := linodeRegex.FindStringSubmatch(host); a != nil { + bucket, region = a[1], a[2] + endpoint = fmt.Sprintf("%s.linodeobjects.com", region) + } else if a := backblazeRegex.FindStringSubmatch(host); a != nil { + bucket, region = a[1], a[2] + endpoint = fmt.Sprintf("s3.%s.backblazeb2.com", region) + } else { + bucket = host + forcePathStyle = false + } + + // Add port back to endpoint, if available. + if endpoint != "" && port != "" { + endpoint = net.JoinHostPort(endpoint, port) + } + + // Prepend scheme to endpoint. + if endpoint != "" { + endpoint = scheme + "://" + endpoint + } + + return bucket, region, endpoint, forcePathStyle +} + +var ( + localhostRegex = regexp.MustCompile(`^(?:(.+)\.)?localhost$`) + digitalOceanRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.digitaloceanspaces.com$`) + linodeRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.linodeobjects.com$`) + backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`) + gcsRegex = regexp.MustCompile(`^(?:(.+)\.)?storage.googleapis.com$`) +) + func isNotExists(err error) bool { switch err := err.(type) { case awserr.Error: diff --git a/s3/replica_client_test.go b/s3/replica_client_test.go index f80947c..913d985 100644 --- a/s3/replica_client_test.go +++ b/s3/replica_client_test.go @@ -38,106 +38,6 @@ func TestReplicaClient_Type(t *testing.T) { } } -func TestReplicaClient_GenerationsDir(t *testing.T) { - t.Run("OK", func(t *testing.T) { - c := s3.NewReplicaClient() - c.Path = "foo" - if got, want := c.GenerationsDir(), "foo/generations"; got != want { - t.Fatalf("GenerationsDir()=%v, want %v", got, want) - } - }) - t.Run("NoPath", func(t *testing.T) { - if got, want := s3.NewReplicaClient().GenerationsDir(), "generations"; got != want { - t.Fatalf("GenerationsDir()=%v, want %v", got, want) - } - }) -} - -func TestReplicaClient_GenerationDir(t *testing.T) { - t.Run("OK", func(t *testing.T) { - c := s3.NewReplicaClient() - c.Path = "foo" - if got, err := c.GenerationDir("0123456701234567"); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567"; got != want { - t.Fatalf("GenerationDir()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := s3.NewReplicaClient().GenerationDir(""); err == nil || err.Error() != `generation required` { - t.Fatalf("expected error: %v", err) - } - }) -} - -func TestReplicaClient_SnapshotsDir(t *testing.T) { - t.Run("OK", func(t *testing.T) { - c := s3.NewReplicaClient() - c.Path = "foo" - if got, err := c.SnapshotsDir("0123456701234567"); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/snapshots"; got != want { - t.Fatalf("SnapshotsDir()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := s3.NewReplicaClient().SnapshotsDir(""); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - -func TestReplicaClient_SnapshotPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - c := s3.NewReplicaClient() - c.Path = "foo" - if got, err := c.SnapshotPath("0123456701234567", 1000); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/snapshots/000003e8.snapshot.lz4"; got != want { - t.Fatalf("SnapshotPath()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := s3.NewReplicaClient().SnapshotPath("", 1000); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - -func TestReplicaClient_WALDir(t *testing.T) { - t.Run("OK", func(t *testing.T) { - c := s3.NewReplicaClient() - c.Path = "foo" - if got, err := c.WALDir("0123456701234567"); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/wal"; got != want { - t.Fatalf("WALDir()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := s3.NewReplicaClient().WALDir(""); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - -func TestReplicaClient_WALSegmentPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - c := s3.NewReplicaClient() - c.Path = "foo" - if got, err := c.WALSegmentPath("0123456701234567", 1000, 1001); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want { - t.Fatalf("WALPath()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := s3.NewReplicaClient().WALSegmentPath("", 1000, 0); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - func TestReplicaClient_Generations(t *testing.T) { t.Run("OK", func(t *testing.T) { t.Parallel() @@ -574,6 +474,79 @@ func TestReplicaClient_DeleteWALSegments(t *testing.T) { }) } +func TestParseHost(t *testing.T) { + // Ensure non-specific hosts return as buckets. + t.Run("S3", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.litestream.io`) + if got, want := bucket, `test.litestream.io`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, ``; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, ``; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, false; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + + // Ensure localhosts use an HTTP endpoint and extract the bucket name. + t.Run("Localhost", func(t *testing.T) { + t.Run("WithPort", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost:9000`) + if got, want := bucket, `test`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-east-1`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `http://localhost:9000`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + + t.Run("WithoutPort", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost`) + if got, want := bucket, `test`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-east-1`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `http://localhost`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + }) + + // Ensure backblaze B2 URLs extract bucket, region, & endpoint from host. + t.Run("Backblaze", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test-123.s3.us-west-000.backblazeb2.com`) + if got, want := bucket, `test-123`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-west-000`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `https://s3.us-west-000.backblazeb2.com`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + + // Ensure GCS URLs extract bucket & endpoint from host. + t.Run("GCS", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`litestream.io.storage.googleapis.com`) + if got, want := bucket, `litestream.io`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-east-1`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `https://storage.googleapis.com`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) +} + // NewIntegrationReplicaClient returns a new client for integration testing. // If integration flag is not set then test/benchmark is skipped. func NewIntegrationReplicaClient(tb testing.TB) *s3.ReplicaClient { diff --git a/s3/s3.go b/s3/s3.go deleted file mode 100644 index 33d1fbb..0000000 --- a/s3/s3.go +++ /dev/null @@ -1,61 +0,0 @@ -package s3 - -import ( - "fmt" - "net" - "regexp" -) - -// ParseHost extracts data from a hostname depending on the service provider. -func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool) { - // Extract port if one is specified. - host, port, err := net.SplitHostPort(s) - if err != nil { - host = s - } - - // Default to path-based URLs, except for with AWS S3 itself. - forcePathStyle = true - - // Extract fields from provider-specific host formats. - scheme := "https" - if a := localhostRegex.FindStringSubmatch(host); a != nil { - bucket, region = a[1], "us-east-1" - scheme, endpoint = "http", "localhost" - } else if a := gcsRegex.FindStringSubmatch(host); a != nil { - bucket, region = a[1], "us-east-1" - endpoint = "storage.googleapis.com" - } else if a := digitalOceanRegex.FindStringSubmatch(host); a != nil { - bucket, region = a[1], a[2] - endpoint = fmt.Sprintf("%s.digitaloceanspaces.com", region) - } else if a := linodeRegex.FindStringSubmatch(host); a != nil { - bucket, region = a[1], a[2] - endpoint = fmt.Sprintf("%s.linodeobjects.com", region) - } else if a := backblazeRegex.FindStringSubmatch(host); a != nil { - bucket, region = a[1], a[2] - endpoint = fmt.Sprintf("s3.%s.backblazeb2.com", region) - } else { - bucket = host - forcePathStyle = false - } - - // Add port back to endpoint, if available. - if endpoint != "" && port != "" { - endpoint = net.JoinHostPort(endpoint, port) - } - - // Prepend scheme to endpoint. - if endpoint != "" { - endpoint = scheme + "://" + endpoint - } - - return bucket, region, endpoint, forcePathStyle -} - -var ( - localhostRegex = regexp.MustCompile(`^(?:(.+)\.)?localhost$`) - digitalOceanRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.digitaloceanspaces.com$`) - linodeRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.linodeobjects.com$`) - backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`) - gcsRegex = regexp.MustCompile(`^(?:(.+)\.)?storage.googleapis.com$`) -) diff --git a/s3/s3_test.go b/s3/s3_test.go deleted file mode 100644 index f39e10d..0000000 --- a/s3/s3_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package s3_test - -import ( - "testing" - - "github.com/benbjohnson/litestream/s3" -) - -func TestParseHost(t *testing.T) { - // Ensure non-specific hosts return as buckets. - t.Run("S3", func(t *testing.T) { - bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.litestream.io`) - if got, want := bucket, `test.litestream.io`; got != want { - t.Fatalf("bucket=%q, want %q", got, want) - } else if got, want := region, ``; got != want { - t.Fatalf("region=%q, want %q", got, want) - } else if got, want := endpoint, ``; got != want { - t.Fatalf("endpoint=%q, want %q", got, want) - } else if got, want := forcePathStyle, false; got != want { - t.Fatalf("forcePathStyle=%v, want %v", got, want) - } - }) - - // Ensure localhosts use an HTTP endpoint and extract the bucket name. - t.Run("Localhost", func(t *testing.T) { - t.Run("WithPort", func(t *testing.T) { - bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost:9000`) - if got, want := bucket, `test`; got != want { - t.Fatalf("bucket=%q, want %q", got, want) - } else if got, want := region, `us-east-1`; got != want { - t.Fatalf("region=%q, want %q", got, want) - } else if got, want := endpoint, `http://localhost:9000`; got != want { - t.Fatalf("endpoint=%q, want %q", got, want) - } else if got, want := forcePathStyle, true; got != want { - t.Fatalf("forcePathStyle=%v, want %v", got, want) - } - }) - - t.Run("WithoutPort", func(t *testing.T) { - bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost`) - if got, want := bucket, `test`; got != want { - t.Fatalf("bucket=%q, want %q", got, want) - } else if got, want := region, `us-east-1`; got != want { - t.Fatalf("region=%q, want %q", got, want) - } else if got, want := endpoint, `http://localhost`; got != want { - t.Fatalf("endpoint=%q, want %q", got, want) - } else if got, want := forcePathStyle, true; got != want { - t.Fatalf("forcePathStyle=%v, want %v", got, want) - } - }) - }) - - // Ensure backblaze B2 URLs extract bucket, region, & endpoint from host. - t.Run("Backblaze", func(t *testing.T) { - bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test-123.s3.us-west-000.backblazeb2.com`) - if got, want := bucket, `test-123`; got != want { - t.Fatalf("bucket=%q, want %q", got, want) - } else if got, want := region, `us-west-000`; got != want { - t.Fatalf("region=%q, want %q", got, want) - } else if got, want := endpoint, `https://s3.us-west-000.backblazeb2.com`; got != want { - t.Fatalf("endpoint=%q, want %q", got, want) - } else if got, want := forcePathStyle, true; got != want { - t.Fatalf("forcePathStyle=%v, want %v", got, want) - } - }) - - // Ensure GCS URLs extract bucket & endpoint from host. - t.Run("GCS", func(t *testing.T) { - bucket, region, endpoint, forcePathStyle := s3.ParseHost(`litestream.io.storage.googleapis.com`) - if got, want := bucket, `litestream.io`; got != want { - t.Fatalf("bucket=%q, want %q", got, want) - } else if got, want := region, `us-east-1`; got != want { - t.Fatalf("region=%q, want %q", got, want) - } else if got, want := endpoint, `https://storage.googleapis.com`; got != want { - t.Fatalf("endpoint=%q, want %q", got, want) - } else if got, want := forcePathStyle, true; got != want { - t.Fatalf("forcePathStyle=%v, want %v", got, want) - } - }) -}