From ad9ce431275a36ad1df0ebdb4ad6de3562b9c3c7 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 21 Feb 2021 09:35:16 -0700 Subject: [PATCH] Add support for S3-compatible object storage. This commits adds support for non-AWS S3-compatible storage such as MinIO, Backblaze B2, & Google Cloud Storage (GCS). Other backends should also work but some code has been added to make URL-based configurations work more easily. --- cmd/litestream/databases.go | 4 +- cmd/litestream/generations.go | 4 +- cmd/litestream/main.go | 308 ++++++++++++++++++---------------- cmd/litestream/main_test.go | 98 +++++++++++ cmd/litestream/replicate.go | 4 +- cmd/litestream/restore.go | 4 +- cmd/litestream/snapshots.go | 4 +- cmd/litestream/wal.go | 4 +- s3/s3.go | 73 +++++++- s3/s3_test.go | 80 +++++++++ 10 files changed, 419 insertions(+), 164 deletions(-) create mode 100644 cmd/litestream/main_test.go create mode 100644 s3/s3_test.go diff --git a/cmd/litestream/databases.go b/cmd/litestream/databases.go index eb7916d..86452c6 100644 --- a/cmd/litestream/databases.go +++ b/cmd/litestream/databases.go @@ -22,7 +22,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) { if err := fs.Parse(args); err != nil { return err } else if fs.NArg() != 0 { - return fmt.Errorf("too many argument") + return fmt.Errorf("too many arguments") } // Load configuration. @@ -40,7 +40,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) { fmt.Fprintln(w, "path\treplicas") for _, dbConfig := range config.DBs { - db, err := newDBFromConfig(&config, dbConfig) + db, err := NewDBFromConfig(dbConfig) if err != nil { return err } diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index ca9481f..7df2af7 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -35,7 +35,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) var r litestream.Replica updatedAt := time.Now() if isURL(fs.Arg(0)) { - if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil { + if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil { return err } } else if configPath != "" { @@ -50,7 +50,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) return err } else if dbc := config.DBConfig(path); dbc == nil { return fmt.Errorf("database not found in config: %s", path) - } else if db, err = newDBFromConfig(&config, dbc); err != nil { + } else if db, err = NewDBFromConfig(dbc); err != nil { return err } diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 7762732..0c02c52 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -143,8 +143,20 @@ type Config struct { // Global S3 settings AccessKeyID string `yaml:"access-key-id"` SecretAccessKey string `yaml:"secret-access-key"` - Region string `yaml:"region"` - Bucket string `yaml:"bucket"` +} + +// propagateGlobalSettings copies global S3 settings to replica configs. +func (c *Config) propagateGlobalSettings() { + for _, dbc := range c.DBs { + for _, rc := range dbc.Replicas { + if rc.AccessKeyID != "" { + rc.AccessKeyID = c.AccessKeyID + } + if rc.SecretAccessKey != "" { + rc.SecretAccessKey = c.SecretAccessKey + } + } + } } // DefaultConfig returns a new instance of Config with defaults set. @@ -188,6 +200,9 @@ func ReadConfigFile(filename string) (_ Config, err error) { } } + // Propage settings from global config to replica configs. + config.propagateGlobalSettings() + return config, nil } @@ -197,6 +212,28 @@ type DBConfig struct { Replicas []*ReplicaConfig `yaml:"replicas"` } +// NewDBFromConfig instantiates a DB based on a configuration. +func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) { + path, err := expand(dbc.Path) + if err != nil { + return nil, err + } + + // Initialize database with given path. + db := litestream.NewDB(path) + + // Instantiate and attach replicas. + for _, rc := range dbc.Replicas { + r, err := NewReplicaFromConfig(rc, db) + if err != nil { + return nil, err + } + db.Replicas = append(db.Replicas, r) + } + + return db, nil +} + // ReplicaConfig represents the configuration for a single replica in a database. type ReplicaConfig struct { Type string `yaml:"type"` // "file", "s3" @@ -213,26 +250,132 @@ type ReplicaConfig struct { SecretAccessKey string `yaml:"secret-access-key"` Region string `yaml:"region"` Bucket string `yaml:"bucket"` + Endpoint string `yaml:"endpoint"` + ForcePathStyle bool `yaml:"force-path-style"` } -// NewReplicaFromURL returns a new Replica instance configured from a URL. -// The replica's database is not set. -func NewReplicaFromURL(s string) (litestream.Replica, error) { - scheme, host, path, err := ParseReplicaURL(s) - if err != nil { +// NewReplicaFromConfig instantiates a replica for a DB based on a config. +func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (litestream.Replica, error) { + // Ensure user did not specify URL in path. + if isURL(c.Path) { + return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", c.Path) + } + + switch c.ReplicaType() { + case "file": + return newFileReplicaFromConfig(c, db) + case "s3": + return newS3ReplicaFromConfig(c, db) + default: + return nil, fmt.Errorf("unknown replica type in config: %q", c.Type) + } +} + +// newFileReplicaFromConfig returns a new instance of FileReplica build from config. +func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.FileReplica, err error) { + // Ensure URL & path are not both specified. + if c.URL != "" && c.Path != "" { + return nil, fmt.Errorf("cannot specify url & path for file replica") + } + + // Parse path from URL, if specified. + path := c.Path + if c.URL != "" { + if _, _, path, err = ParseReplicaURL(c.URL); err != nil { + return nil, err + } + } + + // Ensure path is set explicitly or derived from URL field. + if path == "" { + return nil, fmt.Errorf("file replica path required") + } + + // Expand home prefix and return absolute path. + if path, err = expand(path); err != nil { return nil, err } - switch scheme { - case "file": - return litestream.NewFileReplica(nil, "", path), nil - case "s3": - r := s3.NewReplica(nil, "") - r.Bucket, r.Path = host, path - return r, nil - default: - return nil, fmt.Errorf("invalid replica url type: %s", s) + // Instantiate replica and apply time fields, if set. + r := litestream.NewFileReplica(db, c.Name, path) + if v := c.Retention; v > 0 { + r.Retention = v } + if v := c.RetentionCheckInterval; v > 0 { + r.RetentionCheckInterval = v + } + if v := c.ValidationInterval; v > 0 { + r.ValidationInterval = v + } + return r, nil +} + +// newS3ReplicaFromConfig returns a new instance of S3Replica build from config. +func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica, err error) { + // Ensure URL & constituent parts are not both specified. + if c.URL != "" && c.Path != "" { + return nil, fmt.Errorf("cannot specify url & path for s3 replica") + } else if c.URL != "" && c.Bucket != "" { + return nil, fmt.Errorf("cannot specify url & bucket for s3 replica") + } + + bucket, path := c.Bucket, c.Path + region, endpoint, forcePathStyle := c.Region, c.Endpoint, c.ForcePathStyle + + // Apply settings from URL, if specified. + if c.URL != "" { + _, host, upath, err := ParseReplicaURL(c.URL) + if err != nil { + return nil, err + } + ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host) + + // Only apply URL parts to field that have not been overridden. + if path == "" { + path = upath + } + if bucket == "" { + bucket = ubucket + } + if region == "" { + region = uregion + } + if endpoint == "" { + endpoint = uendpoint + } + if !forcePathStyle { + forcePathStyle = uforcePathStyle + } + } + + // Ensure required settings are set. + if bucket == "" { + return nil, fmt.Errorf("bucket required for s3 replica") + } + + // Build replica. + r := s3.NewReplica(db, c.Name) + r.AccessKeyID = c.AccessKeyID + r.SecretAccessKey = c.SecretAccessKey + r.Bucket = bucket + r.Path = path + r.Region = region + r.Endpoint = endpoint + r.ForcePathStyle = forcePathStyle + + if v := c.Retention; v > 0 { + r.Retention = v + } + if v := c.RetentionCheckInterval; v > 0 { + r.RetentionCheckInterval = v + } + if v := c.SyncInterval; v > 0 { + r.SyncInterval = v + } + if v := c.ValidationInterval; v > 0 { + r.ValidationInterval = v + } + return r, nil } // ParseReplicaURL parses a replica URL. @@ -262,9 +405,9 @@ func isURL(s string) bool { // ReplicaType returns the type based on the type field or extracted from the URL. func (c *ReplicaConfig) ReplicaType() string { - typ, _, _, _ := ParseReplicaURL(c.URL) - if typ != "" { - return typ + scheme, _, _, _ := ParseReplicaURL(c.URL) + if scheme != "" { + return scheme } else if c.Type != "" { return c.Type } @@ -283,133 +426,6 @@ func registerConfigFlag(fs *flag.FlagSet, p *string) { fs.StringVar(p, "config", DefaultConfigPath(), "config path") } -// newDBFromConfig instantiates a DB based on a configuration. -func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) { - path, err := expand(dbc.Path) - if err != nil { - return nil, err - } - - // Initialize database with given path. - db := litestream.NewDB(path) - - // Instantiate and attach replicas. - for _, rc := range dbc.Replicas { - r, err := newReplicaFromConfig(db, c, dbc, rc) - if err != nil { - return nil, err - } - db.Replicas = append(db.Replicas, r) - } - - return db, nil -} - -// newReplicaFromConfig instantiates a replica for a DB based on a config. -func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) { - // Ensure user did not specify URL in path. - if isURL(rc.Path) { - return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path) - } - - switch rc.ReplicaType() { - case "file": - return newFileReplicaFromConfig(db, c, dbc, rc) - case "s3": - return newS3ReplicaFromConfig(db, c, dbc, rc) - default: - return nil, fmt.Errorf("unknown replica type in config: %q", rc.Type) - } -} - -// newFileReplicaFromConfig returns a new instance of FileReplica build from config. -func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) { - path := rc.Path - if rc.URL != "" { - _, _, path, err = ParseReplicaURL(rc.URL) - if err != nil { - return nil, err - } - } - - if path == "" { - return nil, fmt.Errorf("%s: file replica path required", db.Path()) - } - - if path, err = expand(path); err != nil { - return nil, err - } - - r := litestream.NewFileReplica(db, rc.Name, path) - if v := rc.Retention; v > 0 { - r.Retention = v - } - if v := rc.RetentionCheckInterval; v > 0 { - r.RetentionCheckInterval = v - } - if v := rc.ValidationInterval; v > 0 { - r.ValidationInterval = v - } - return r, nil -} - -// newS3ReplicaFromConfig returns a new instance of S3Replica build from config. -func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) { - bucket := c.Bucket - if v := rc.Bucket; v != "" { - bucket = v - } - - path := rc.Path - if rc.URL != "" { - _, bucket, path, err = ParseReplicaURL(rc.URL) - if err != nil { - return nil, err - } - } - - // Use global or replica-specific S3 settings. - accessKeyID := c.AccessKeyID - if v := rc.AccessKeyID; v != "" { - accessKeyID = v - } - secretAccessKey := c.SecretAccessKey - if v := rc.SecretAccessKey; v != "" { - secretAccessKey = v - } - region := c.Region - if v := rc.Region; v != "" { - region = v - } - - // Ensure required settings are set. - if bucket == "" { - return nil, fmt.Errorf("%s: s3 bucket required", db.Path()) - } - - // Build replica. - r := s3.NewReplica(db, rc.Name) - r.AccessKeyID = accessKeyID - r.SecretAccessKey = secretAccessKey - r.Region = region - r.Bucket = bucket - r.Path = path - - if v := rc.Retention; v > 0 { - r.Retention = v - } - if v := rc.RetentionCheckInterval; v > 0 { - r.RetentionCheckInterval = v - } - if v := rc.SyncInterval; v > 0 { - r.SyncInterval = v - } - if v := rc.ValidationInterval; v > 0 { - r.ValidationInterval = v - } - return r, nil -} - // expand returns an absolute path for s. func expand(s string) (string, error) { // Just expand to absolute path if there is no home directory prefix. diff --git a/cmd/litestream/main_test.go b/cmd/litestream/main_test.go new file mode 100644 index 0000000..b34b8f1 --- /dev/null +++ b/cmd/litestream/main_test.go @@ -0,0 +1,98 @@ +package main_test + +import ( + "testing" + + "github.com/benbjohnson/litestream" + main "github.com/benbjohnson/litestream/cmd/litestream" + "github.com/benbjohnson/litestream/s3" +) + +func TestNewFileReplicaFromConfig(t *testing.T) { + r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{Path: "/foo"}, nil) + if err != nil { + t.Fatal(err) + } else if r, ok := r.(*litestream.FileReplica); !ok { + t.Fatal("unexpected replica type") + } else if got, want := r.Path(), "/foo"; got != want { + t.Fatalf("Path=%s, want %s", got, want) + } +} + +func TestNewS3ReplicaFromConfig(t *testing.T) { + t.Run("URL", func(t *testing.T) { + r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo/bar"}, nil) + if err != nil { + t.Fatal(err) + } else if r, ok := r.(*s3.Replica); !ok { + t.Fatal("unexpected replica type") + } else if got, want := r.Bucket, "foo"; got != want { + t.Fatalf("Bucket=%s, want %s", got, want) + } else if got, want := r.Path, "bar"; got != want { + t.Fatalf("Path=%s, want %s", got, want) + } else if got, want := r.Region, ""; got != want { + t.Fatalf("Region=%s, want %s", got, want) + } else if got, want := r.Endpoint, ""; got != want { + t.Fatalf("Endpoint=%s, want %s", got, want) + } else if got, want := r.ForcePathStyle, false; got != want { + t.Fatalf("ForcePathStyle=%v, want %v", got, want) + } + }) + + t.Run("MinIO", func(t *testing.T) { + r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.localhost:9000/bar"}, nil) + if err != nil { + t.Fatal(err) + } else if r, ok := r.(*s3.Replica); !ok { + t.Fatal("unexpected replica type") + } else if got, want := r.Bucket, "foo"; got != want { + t.Fatalf("Bucket=%s, want %s", got, want) + } else if got, want := r.Path, "bar"; got != want { + t.Fatalf("Path=%s, want %s", got, want) + } else if got, want := r.Region, "us-east-1"; got != want { + t.Fatalf("Region=%s, want %s", got, want) + } else if got, want := r.Endpoint, "http://localhost:9000"; got != want { + t.Fatalf("Endpoint=%s, want %s", got, want) + } else if got, want := r.ForcePathStyle, true; got != want { + t.Fatalf("ForcePathStyle=%v, want %v", got, want) + } + }) + + t.Run("Backblaze", func(t *testing.T) { + r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.s3.us-west-000.backblazeb2.com/bar"}, nil) + if err != nil { + t.Fatal(err) + } else if r, ok := r.(*s3.Replica); !ok { + t.Fatal("unexpected replica type") + } else if got, want := r.Bucket, "foo"; got != want { + t.Fatalf("Bucket=%s, want %s", got, want) + } else if got, want := r.Path, "bar"; got != want { + t.Fatalf("Path=%s, want %s", got, want) + } else if got, want := r.Region, "us-west-000"; got != want { + t.Fatalf("Region=%s, want %s", got, want) + } else if got, want := r.Endpoint, "https://s3.us-west-000.backblazeb2.com"; got != want { + t.Fatalf("Endpoint=%s, want %s", got, want) + } else if got, want := r.ForcePathStyle, true; got != want { + t.Fatalf("ForcePathStyle=%v, want %v", got, want) + } + }) + + t.Run("GCS", func(t *testing.T) { + r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.storage.googleapis.com/bar"}, nil) + if err != nil { + t.Fatal(err) + } else if r, ok := r.(*s3.Replica); !ok { + t.Fatal("unexpected replica type") + } else if got, want := r.Bucket, "foo"; got != want { + t.Fatalf("Bucket=%s, want %s", got, want) + } else if got, want := r.Path, "bar"; got != want { + t.Fatalf("Path=%s, want %s", got, want) + } else if got, want := r.Region, "us-east-1"; got != want { + t.Fatalf("Region=%s, want %s", got, want) + } else if got, want := r.Endpoint, "https://storage.googleapis.com"; got != want { + t.Fatalf("Endpoint=%s, want %s", got, want) + } else if got, want := r.ForcePathStyle, true; got != want { + t.Fatalf("ForcePathStyle=%v, want %v", got, want) + } + }) +} diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 3ae515a..41b6a0d 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -83,7 +83,7 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { } for _, dbConfig := range c.Config.DBs { - db, err := newDBFromConfig(&c.Config, dbConfig) + db, err := NewDBFromConfig(dbConfig) if err != nil { return err } @@ -103,7 +103,7 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { case *litestream.FileReplica: log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), r.Type(), r.Path()) case *s3.Replica: - log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q", r.Name(), r.Type(), r.Bucket, r.Path, r.Region) + log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), r.Type(), r.Bucket, r.Path, r.Region, r.Endpoint, r.SyncInterval) default: log.Printf("replicating to: name=%q type=%q", r.Name(), r.Type()) } diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 70c3028..7c6056a 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -80,7 +80,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { // loadFromURL creates a replica & updates the restore options from a replica URL. func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) { - r, err := NewReplicaFromURL(replicaURL) + r, err := NewReplicaFromConfig(&ReplicaConfig{URL: replicaURL}, nil) if err != nil { return nil, err } @@ -104,7 +104,7 @@ func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath if dbConfig == nil { return nil, fmt.Errorf("database not found in config: %s", dbPath) } - db, err := newDBFromConfig(&config, dbConfig) + db, err := NewDBFromConfig(dbConfig) if err != nil { return nil, err } diff --git a/cmd/litestream/snapshots.go b/cmd/litestream/snapshots.go index 52a71d8..0e01684 100644 --- a/cmd/litestream/snapshots.go +++ b/cmd/litestream/snapshots.go @@ -33,7 +33,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { var db *litestream.DB var r litestream.Replica if isURL(fs.Arg(0)) { - if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil { + if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil { return err } } else if configPath != "" { @@ -48,7 +48,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { return err } else if dbc := config.DBConfig(path); dbc == nil { return fmt.Errorf("database not found in config: %s", path) - } else if db, err = newDBFromConfig(&config, dbc); err != nil { + } else if db, err = NewDBFromConfig(dbc); err != nil { return err } diff --git a/cmd/litestream/wal.go b/cmd/litestream/wal.go index 1d7bf41..c213ce5 100644 --- a/cmd/litestream/wal.go +++ b/cmd/litestream/wal.go @@ -34,7 +34,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { var db *litestream.DB var r litestream.Replica if isURL(fs.Arg(0)) { - if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil { + if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil { return err } } else if configPath != "" { @@ -49,7 +49,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { return err } else if dbc := config.DBConfig(path); dbc == nil { return fmt.Errorf("database not found in config: %s", path) - } else if db, err = newDBFromConfig(&config, dbc); err != nil { + } else if db, err = NewDBFromConfig(dbc); err != nil { return err } diff --git a/s3/s3.go b/s3/s3.go index 5897320..e9eb01b 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -7,8 +7,10 @@ import ( "io" "io/ioutil" "log" + "net" "os" "path" + "regexp" "sync" "time" @@ -69,9 +71,11 @@ type Replica struct { SecretAccessKey string // S3 bucket information - Region string - Bucket string - Path string + Region string + Bucket string + Path string + Endpoint string + ForcePathStyle bool // Time between syncs with the shadow WAL. SyncInterval time.Duration @@ -646,9 +650,11 @@ func (r *Replica) Init(ctx context.Context) (err error) { return nil } - // Look up region if not specified. + // Look up region if not specified and no endpoint is used. + // Endpoints are typically used for non-S3 object stores and do not + // necessarily require a region. region := r.Region - if region == "" { + if region == "" && r.Endpoint == "" { if region, err = r.findBucketRegion(ctx, r.Bucket); err != nil { return fmt.Errorf("cannot lookup bucket region: %w", err) } @@ -656,7 +662,9 @@ func (r *Replica) Init(ctx context.Context) (err error) { // Create new AWS session. config := r.config() - config.Region = aws.String(region) + if region != "" { + config.Region = aws.String(region) + } sess, err := session.NewSession(config) if err != nil { return fmt.Errorf("cannot create aws session: %w", err) @@ -673,6 +681,12 @@ func (r *Replica) config() *aws.Config { if r.AccessKeyID != "" || r.SecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "") } + if r.Endpoint != "" { + config.Endpoint = aws.String(r.Endpoint) + } + if r.ForcePathStyle { + config.S3ForcePathStyle = aws.Bool(r.ForcePathStyle) + } return config } @@ -1027,6 +1041,53 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, return nil } +// ParseHost extracts data from a hostname depending on the service provider. +func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool) { + // Extract port if one is specified. + host, port, err := net.SplitHostPort(s) + if err != nil { + host = s + } + + // Default to path-based URLs, except for with AWS S3 itself. + forcePathStyle = true + + // Extract fields from provider-specific host formats. + scheme := "https" + if a := localhostRegex.FindStringSubmatch(host); a != nil { + bucket, region = a[1], "us-east-1" + scheme, endpoint = "http", "localhost" + } else if a := gcsRegex.FindStringSubmatch(host); a != nil { + bucket, region = a[1], "us-east-1" + endpoint = "storage.googleapis.com" + } else if a := backblazeRegex.FindStringSubmatch(host); a != nil { + bucket = a[1] + region = a[2] + endpoint = fmt.Sprintf("s3.%s.backblazeb2.com", a[2]) + } else { + bucket = host + forcePathStyle = false + } + + // Add port back to endpoint, if available. + if endpoint != "" && port != "" { + endpoint = net.JoinHostPort(endpoint, port) + } + + // Prepend scheme to endpoint. + if endpoint != "" { + endpoint = scheme + "://" + endpoint + } + + return bucket, region, endpoint, forcePathStyle +} + +var ( + localhostRegex = regexp.MustCompile(`^(?:(.+)\.)?localhost$`) + backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`) + gcsRegex = regexp.MustCompile(`^(?:(.+)\.)?storage.googleapis.com$`) +) + // S3 metrics. var ( operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ diff --git a/s3/s3_test.go b/s3/s3_test.go new file mode 100644 index 0000000..f39e10d --- /dev/null +++ b/s3/s3_test.go @@ -0,0 +1,80 @@ +package s3_test + +import ( + "testing" + + "github.com/benbjohnson/litestream/s3" +) + +func TestParseHost(t *testing.T) { + // Ensure non-specific hosts return as buckets. + t.Run("S3", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.litestream.io`) + if got, want := bucket, `test.litestream.io`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, ``; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, ``; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, false; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + + // Ensure localhosts use an HTTP endpoint and extract the bucket name. + t.Run("Localhost", func(t *testing.T) { + t.Run("WithPort", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost:9000`) + if got, want := bucket, `test`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-east-1`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `http://localhost:9000`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + + t.Run("WithoutPort", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost`) + if got, want := bucket, `test`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-east-1`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `http://localhost`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + }) + + // Ensure backblaze B2 URLs extract bucket, region, & endpoint from host. + t.Run("Backblaze", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test-123.s3.us-west-000.backblazeb2.com`) + if got, want := bucket, `test-123`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-west-000`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `https://s3.us-west-000.backblazeb2.com`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) + + // Ensure GCS URLs extract bucket & endpoint from host. + t.Run("GCS", func(t *testing.T) { + bucket, region, endpoint, forcePathStyle := s3.ParseHost(`litestream.io.storage.googleapis.com`) + if got, want := bucket, `litestream.io`; got != want { + t.Fatalf("bucket=%q, want %q", got, want) + } else if got, want := region, `us-east-1`; got != want { + t.Fatalf("region=%q, want %q", got, want) + } else if got, want := endpoint, `https://storage.googleapis.com`; got != want { + t.Fatalf("endpoint=%q, want %q", got, want) + } else if got, want := forcePathStyle, true; got != want { + t.Fatalf("forcePathStyle=%v, want %v", got, want) + } + }) +}