Merge pull request #66 from benbjohnson/s3-compatible
This commit is contained in:
@@ -22,7 +22,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
if err := fs.Parse(args); err != nil {
|
if err := fs.Parse(args); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if fs.NArg() != 0 {
|
} else if fs.NArg() != 0 {
|
||||||
return fmt.Errorf("too many argument")
|
return fmt.Errorf("too many arguments")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load configuration.
|
// Load configuration.
|
||||||
@@ -40,7 +40,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
|
|
||||||
fmt.Fprintln(w, "path\treplicas")
|
fmt.Fprintln(w, "path\treplicas")
|
||||||
for _, dbConfig := range config.DBs {
|
for _, dbConfig := range config.DBs {
|
||||||
db, err := newDBFromConfig(&config, dbConfig)
|
db, err := NewDBFromConfig(dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
|||||||
var r litestream.Replica
|
var r litestream.Replica
|
||||||
updatedAt := time.Now()
|
updatedAt := time.Now()
|
||||||
if isURL(fs.Arg(0)) {
|
if isURL(fs.Arg(0)) {
|
||||||
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
|
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if configPath != "" {
|
} else if configPath != "" {
|
||||||
@@ -50,7 +50,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
|||||||
return err
|
return err
|
||||||
} else if dbc := config.DBConfig(path); dbc == nil {
|
} else if dbc := config.DBConfig(path); dbc == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", path)
|
return fmt.Errorf("database not found in config: %s", path)
|
||||||
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
|
} else if db, err = NewDBFromConfig(dbc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -143,8 +143,20 @@ type Config struct {
|
|||||||
// Global S3 settings
|
// Global S3 settings
|
||||||
AccessKeyID string `yaml:"access-key-id"`
|
AccessKeyID string `yaml:"access-key-id"`
|
||||||
SecretAccessKey string `yaml:"secret-access-key"`
|
SecretAccessKey string `yaml:"secret-access-key"`
|
||||||
Region string `yaml:"region"`
|
}
|
||||||
Bucket string `yaml:"bucket"`
|
|
||||||
|
// propagateGlobalSettings copies global S3 settings to replica configs.
|
||||||
|
func (c *Config) propagateGlobalSettings() {
|
||||||
|
for _, dbc := range c.DBs {
|
||||||
|
for _, rc := range dbc.Replicas {
|
||||||
|
if rc.AccessKeyID != "" {
|
||||||
|
rc.AccessKeyID = c.AccessKeyID
|
||||||
|
}
|
||||||
|
if rc.SecretAccessKey != "" {
|
||||||
|
rc.SecretAccessKey = c.SecretAccessKey
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultConfig returns a new instance of Config with defaults set.
|
// DefaultConfig returns a new instance of Config with defaults set.
|
||||||
@@ -188,6 +200,9 @@ func ReadConfigFile(filename string) (_ Config, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Propage settings from global config to replica configs.
|
||||||
|
config.propagateGlobalSettings()
|
||||||
|
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -197,6 +212,28 @@ type DBConfig struct {
|
|||||||
Replicas []*ReplicaConfig `yaml:"replicas"`
|
Replicas []*ReplicaConfig `yaml:"replicas"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewDBFromConfig instantiates a DB based on a configuration.
|
||||||
|
func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
|
||||||
|
path, err := expand(dbc.Path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize database with given path.
|
||||||
|
db := litestream.NewDB(path)
|
||||||
|
|
||||||
|
// Instantiate and attach replicas.
|
||||||
|
for _, rc := range dbc.Replicas {
|
||||||
|
r, err := NewReplicaFromConfig(rc, db)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
db.Replicas = append(db.Replicas, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ReplicaConfig represents the configuration for a single replica in a database.
|
// ReplicaConfig represents the configuration for a single replica in a database.
|
||||||
type ReplicaConfig struct {
|
type ReplicaConfig struct {
|
||||||
Type string `yaml:"type"` // "file", "s3"
|
Type string `yaml:"type"` // "file", "s3"
|
||||||
@@ -213,26 +250,132 @@ type ReplicaConfig struct {
|
|||||||
SecretAccessKey string `yaml:"secret-access-key"`
|
SecretAccessKey string `yaml:"secret-access-key"`
|
||||||
Region string `yaml:"region"`
|
Region string `yaml:"region"`
|
||||||
Bucket string `yaml:"bucket"`
|
Bucket string `yaml:"bucket"`
|
||||||
|
Endpoint string `yaml:"endpoint"`
|
||||||
|
ForcePathStyle bool `yaml:"force-path-style"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReplicaFromURL returns a new Replica instance configured from a URL.
|
// NewReplicaFromConfig instantiates a replica for a DB based on a config.
|
||||||
// The replica's database is not set.
|
func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (litestream.Replica, error) {
|
||||||
func NewReplicaFromURL(s string) (litestream.Replica, error) {
|
// Ensure user did not specify URL in path.
|
||||||
scheme, host, path, err := ParseReplicaURL(s)
|
if isURL(c.Path) {
|
||||||
if err != nil {
|
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", c.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch c.ReplicaType() {
|
||||||
|
case "file":
|
||||||
|
return newFileReplicaFromConfig(c, db)
|
||||||
|
case "s3":
|
||||||
|
return newS3ReplicaFromConfig(c, db)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
|
||||||
|
func newFileReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.FileReplica, err error) {
|
||||||
|
// Ensure URL & path are not both specified.
|
||||||
|
if c.URL != "" && c.Path != "" {
|
||||||
|
return nil, fmt.Errorf("cannot specify url & path for file replica")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse path from URL, if specified.
|
||||||
|
path := c.Path
|
||||||
|
if c.URL != "" {
|
||||||
|
if _, _, path, err = ParseReplicaURL(c.URL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure path is set explicitly or derived from URL field.
|
||||||
|
if path == "" {
|
||||||
|
return nil, fmt.Errorf("file replica path required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expand home prefix and return absolute path.
|
||||||
|
if path, err = expand(path); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch scheme {
|
// Instantiate replica and apply time fields, if set.
|
||||||
case "file":
|
r := litestream.NewFileReplica(db, c.Name, path)
|
||||||
return litestream.NewFileReplica(nil, "", path), nil
|
if v := c.Retention; v > 0 {
|
||||||
case "s3":
|
r.Retention = v
|
||||||
r := s3.NewReplica(nil, "")
|
|
||||||
r.Bucket, r.Path = host, path
|
|
||||||
return r, nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("invalid replica url type: %s", s)
|
|
||||||
}
|
}
|
||||||
|
if v := c.RetentionCheckInterval; v > 0 {
|
||||||
|
r.RetentionCheckInterval = v
|
||||||
|
}
|
||||||
|
if v := c.ValidationInterval; v > 0 {
|
||||||
|
r.ValidationInterval = v
|
||||||
|
}
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
|
||||||
|
func newS3ReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *s3.Replica, err error) {
|
||||||
|
// Ensure URL & constituent parts are not both specified.
|
||||||
|
if c.URL != "" && c.Path != "" {
|
||||||
|
return nil, fmt.Errorf("cannot specify url & path for s3 replica")
|
||||||
|
} else if c.URL != "" && c.Bucket != "" {
|
||||||
|
return nil, fmt.Errorf("cannot specify url & bucket for s3 replica")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, path := c.Bucket, c.Path
|
||||||
|
region, endpoint, forcePathStyle := c.Region, c.Endpoint, c.ForcePathStyle
|
||||||
|
|
||||||
|
// Apply settings from URL, if specified.
|
||||||
|
if c.URL != "" {
|
||||||
|
_, host, upath, err := ParseReplicaURL(c.URL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host)
|
||||||
|
|
||||||
|
// Only apply URL parts to field that have not been overridden.
|
||||||
|
if path == "" {
|
||||||
|
path = upath
|
||||||
|
}
|
||||||
|
if bucket == "" {
|
||||||
|
bucket = ubucket
|
||||||
|
}
|
||||||
|
if region == "" {
|
||||||
|
region = uregion
|
||||||
|
}
|
||||||
|
if endpoint == "" {
|
||||||
|
endpoint = uendpoint
|
||||||
|
}
|
||||||
|
if !forcePathStyle {
|
||||||
|
forcePathStyle = uforcePathStyle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure required settings are set.
|
||||||
|
if bucket == "" {
|
||||||
|
return nil, fmt.Errorf("bucket required for s3 replica")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build replica.
|
||||||
|
r := s3.NewReplica(db, c.Name)
|
||||||
|
r.AccessKeyID = c.AccessKeyID
|
||||||
|
r.SecretAccessKey = c.SecretAccessKey
|
||||||
|
r.Bucket = bucket
|
||||||
|
r.Path = path
|
||||||
|
r.Region = region
|
||||||
|
r.Endpoint = endpoint
|
||||||
|
r.ForcePathStyle = forcePathStyle
|
||||||
|
|
||||||
|
if v := c.Retention; v > 0 {
|
||||||
|
r.Retention = v
|
||||||
|
}
|
||||||
|
if v := c.RetentionCheckInterval; v > 0 {
|
||||||
|
r.RetentionCheckInterval = v
|
||||||
|
}
|
||||||
|
if v := c.SyncInterval; v > 0 {
|
||||||
|
r.SyncInterval = v
|
||||||
|
}
|
||||||
|
if v := c.ValidationInterval; v > 0 {
|
||||||
|
r.ValidationInterval = v
|
||||||
|
}
|
||||||
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseReplicaURL parses a replica URL.
|
// ParseReplicaURL parses a replica URL.
|
||||||
@@ -262,9 +405,9 @@ func isURL(s string) bool {
|
|||||||
|
|
||||||
// ReplicaType returns the type based on the type field or extracted from the URL.
|
// ReplicaType returns the type based on the type field or extracted from the URL.
|
||||||
func (c *ReplicaConfig) ReplicaType() string {
|
func (c *ReplicaConfig) ReplicaType() string {
|
||||||
typ, _, _, _ := ParseReplicaURL(c.URL)
|
scheme, _, _, _ := ParseReplicaURL(c.URL)
|
||||||
if typ != "" {
|
if scheme != "" {
|
||||||
return typ
|
return scheme
|
||||||
} else if c.Type != "" {
|
} else if c.Type != "" {
|
||||||
return c.Type
|
return c.Type
|
||||||
}
|
}
|
||||||
@@ -283,133 +426,6 @@ func registerConfigFlag(fs *flag.FlagSet, p *string) {
|
|||||||
fs.StringVar(p, "config", DefaultConfigPath(), "config path")
|
fs.StringVar(p, "config", DefaultConfigPath(), "config path")
|
||||||
}
|
}
|
||||||
|
|
||||||
// newDBFromConfig instantiates a DB based on a configuration.
|
|
||||||
func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
|
|
||||||
path, err := expand(dbc.Path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize database with given path.
|
|
||||||
db := litestream.NewDB(path)
|
|
||||||
|
|
||||||
// Instantiate and attach replicas.
|
|
||||||
for _, rc := range dbc.Replicas {
|
|
||||||
r, err := newReplicaFromConfig(db, c, dbc, rc)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
db.Replicas = append(db.Replicas, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
return db, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// newReplicaFromConfig instantiates a replica for a DB based on a config.
|
|
||||||
func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) {
|
|
||||||
// Ensure user did not specify URL in path.
|
|
||||||
if isURL(rc.Path) {
|
|
||||||
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", rc.Path)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch rc.ReplicaType() {
|
|
||||||
case "file":
|
|
||||||
return newFileReplicaFromConfig(db, c, dbc, rc)
|
|
||||||
case "s3":
|
|
||||||
return newS3ReplicaFromConfig(db, c, dbc, rc)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown replica type in config: %q", rc.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
|
|
||||||
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *litestream.FileReplica, err error) {
|
|
||||||
path := rc.Path
|
|
||||||
if rc.URL != "" {
|
|
||||||
_, _, path, err = ParseReplicaURL(rc.URL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if path == "" {
|
|
||||||
return nil, fmt.Errorf("%s: file replica path required", db.Path())
|
|
||||||
}
|
|
||||||
|
|
||||||
if path, err = expand(path); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
r := litestream.NewFileReplica(db, rc.Name, path)
|
|
||||||
if v := rc.Retention; v > 0 {
|
|
||||||
r.Retention = v
|
|
||||||
}
|
|
||||||
if v := rc.RetentionCheckInterval; v > 0 {
|
|
||||||
r.RetentionCheckInterval = v
|
|
||||||
}
|
|
||||||
if v := rc.ValidationInterval; v > 0 {
|
|
||||||
r.ValidationInterval = v
|
|
||||||
}
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
|
|
||||||
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (_ *s3.Replica, err error) {
|
|
||||||
bucket := c.Bucket
|
|
||||||
if v := rc.Bucket; v != "" {
|
|
||||||
bucket = v
|
|
||||||
}
|
|
||||||
|
|
||||||
path := rc.Path
|
|
||||||
if rc.URL != "" {
|
|
||||||
_, bucket, path, err = ParseReplicaURL(rc.URL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use global or replica-specific S3 settings.
|
|
||||||
accessKeyID := c.AccessKeyID
|
|
||||||
if v := rc.AccessKeyID; v != "" {
|
|
||||||
accessKeyID = v
|
|
||||||
}
|
|
||||||
secretAccessKey := c.SecretAccessKey
|
|
||||||
if v := rc.SecretAccessKey; v != "" {
|
|
||||||
secretAccessKey = v
|
|
||||||
}
|
|
||||||
region := c.Region
|
|
||||||
if v := rc.Region; v != "" {
|
|
||||||
region = v
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure required settings are set.
|
|
||||||
if bucket == "" {
|
|
||||||
return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build replica.
|
|
||||||
r := s3.NewReplica(db, rc.Name)
|
|
||||||
r.AccessKeyID = accessKeyID
|
|
||||||
r.SecretAccessKey = secretAccessKey
|
|
||||||
r.Region = region
|
|
||||||
r.Bucket = bucket
|
|
||||||
r.Path = path
|
|
||||||
|
|
||||||
if v := rc.Retention; v > 0 {
|
|
||||||
r.Retention = v
|
|
||||||
}
|
|
||||||
if v := rc.RetentionCheckInterval; v > 0 {
|
|
||||||
r.RetentionCheckInterval = v
|
|
||||||
}
|
|
||||||
if v := rc.SyncInterval; v > 0 {
|
|
||||||
r.SyncInterval = v
|
|
||||||
}
|
|
||||||
if v := rc.ValidationInterval; v > 0 {
|
|
||||||
r.ValidationInterval = v
|
|
||||||
}
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// expand returns an absolute path for s.
|
// expand returns an absolute path for s.
|
||||||
func expand(s string) (string, error) {
|
func expand(s string) (string, error) {
|
||||||
// Just expand to absolute path if there is no home directory prefix.
|
// Just expand to absolute path if there is no home directory prefix.
|
||||||
|
|||||||
98
cmd/litestream/main_test.go
Normal file
98
cmd/litestream/main_test.go
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package main_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/benbjohnson/litestream"
|
||||||
|
main "github.com/benbjohnson/litestream/cmd/litestream"
|
||||||
|
"github.com/benbjohnson/litestream/s3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewFileReplicaFromConfig(t *testing.T) {
|
||||||
|
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{Path: "/foo"}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if r, ok := r.(*litestream.FileReplica); !ok {
|
||||||
|
t.Fatal("unexpected replica type")
|
||||||
|
} else if got, want := r.Path(), "/foo"; got != want {
|
||||||
|
t.Fatalf("Path=%s, want %s", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewS3ReplicaFromConfig(t *testing.T) {
|
||||||
|
t.Run("URL", func(t *testing.T) {
|
||||||
|
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo/bar"}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if r, ok := r.(*s3.Replica); !ok {
|
||||||
|
t.Fatal("unexpected replica type")
|
||||||
|
} else if got, want := r.Bucket, "foo"; got != want {
|
||||||
|
t.Fatalf("Bucket=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Path, "bar"; got != want {
|
||||||
|
t.Fatalf("Path=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Region, ""; got != want {
|
||||||
|
t.Fatalf("Region=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Endpoint, ""; got != want {
|
||||||
|
t.Fatalf("Endpoint=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.ForcePathStyle, false; got != want {
|
||||||
|
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MinIO", func(t *testing.T) {
|
||||||
|
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.localhost:9000/bar"}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if r, ok := r.(*s3.Replica); !ok {
|
||||||
|
t.Fatal("unexpected replica type")
|
||||||
|
} else if got, want := r.Bucket, "foo"; got != want {
|
||||||
|
t.Fatalf("Bucket=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Path, "bar"; got != want {
|
||||||
|
t.Fatalf("Path=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Region, "us-east-1"; got != want {
|
||||||
|
t.Fatalf("Region=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Endpoint, "http://localhost:9000"; got != want {
|
||||||
|
t.Fatalf("Endpoint=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.ForcePathStyle, true; got != want {
|
||||||
|
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Backblaze", func(t *testing.T) {
|
||||||
|
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.s3.us-west-000.backblazeb2.com/bar"}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if r, ok := r.(*s3.Replica); !ok {
|
||||||
|
t.Fatal("unexpected replica type")
|
||||||
|
} else if got, want := r.Bucket, "foo"; got != want {
|
||||||
|
t.Fatalf("Bucket=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Path, "bar"; got != want {
|
||||||
|
t.Fatalf("Path=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Region, "us-west-000"; got != want {
|
||||||
|
t.Fatalf("Region=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Endpoint, "https://s3.us-west-000.backblazeb2.com"; got != want {
|
||||||
|
t.Fatalf("Endpoint=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.ForcePathStyle, true; got != want {
|
||||||
|
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GCS", func(t *testing.T) {
|
||||||
|
r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{URL: "s3://foo.storage.googleapis.com/bar"}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if r, ok := r.(*s3.Replica); !ok {
|
||||||
|
t.Fatal("unexpected replica type")
|
||||||
|
} else if got, want := r.Bucket, "foo"; got != want {
|
||||||
|
t.Fatalf("Bucket=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Path, "bar"; got != want {
|
||||||
|
t.Fatalf("Path=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Region, "us-east-1"; got != want {
|
||||||
|
t.Fatalf("Region=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.Endpoint, "https://storage.googleapis.com"; got != want {
|
||||||
|
t.Fatalf("Endpoint=%s, want %s", got, want)
|
||||||
|
} else if got, want := r.ForcePathStyle, true; got != want {
|
||||||
|
t.Fatalf("ForcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -83,7 +83,7 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, dbConfig := range c.Config.DBs {
|
for _, dbConfig := range c.Config.DBs {
|
||||||
db, err := newDBFromConfig(&c.Config, dbConfig)
|
db, err := NewDBFromConfig(dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -103,7 +103,7 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
|
|||||||
case *litestream.FileReplica:
|
case *litestream.FileReplica:
|
||||||
log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), r.Type(), r.Path())
|
log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), r.Type(), r.Path())
|
||||||
case *s3.Replica:
|
case *s3.Replica:
|
||||||
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q", r.Name(), r.Type(), r.Bucket, r.Path, r.Region)
|
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), r.Type(), r.Bucket, r.Path, r.Region, r.Endpoint, r.SyncInterval)
|
||||||
default:
|
default:
|
||||||
log.Printf("replicating to: name=%q type=%q", r.Name(), r.Type())
|
log.Printf("replicating to: name=%q type=%q", r.Name(), r.Type())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
|
|
||||||
// loadFromURL creates a replica & updates the restore options from a replica URL.
|
// loadFromURL creates a replica & updates the restore options from a replica URL.
|
||||||
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
|
func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, opt *litestream.RestoreOptions) (litestream.Replica, error) {
|
||||||
r, err := NewReplicaFromURL(replicaURL)
|
r, err := NewReplicaFromConfig(&ReplicaConfig{URL: replicaURL}, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -104,7 +104,7 @@ func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath
|
|||||||
if dbConfig == nil {
|
if dbConfig == nil {
|
||||||
return nil, fmt.Errorf("database not found in config: %s", dbPath)
|
return nil, fmt.Errorf("database not found in config: %s", dbPath)
|
||||||
}
|
}
|
||||||
db, err := newDBFromConfig(&config, dbConfig)
|
db, err := NewDBFromConfig(dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
var db *litestream.DB
|
var db *litestream.DB
|
||||||
var r litestream.Replica
|
var r litestream.Replica
|
||||||
if isURL(fs.Arg(0)) {
|
if isURL(fs.Arg(0)) {
|
||||||
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
|
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if configPath != "" {
|
} else if configPath != "" {
|
||||||
@@ -48,7 +48,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return err
|
return err
|
||||||
} else if dbc := config.DBConfig(path); dbc == nil {
|
} else if dbc := config.DBConfig(path); dbc == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", path)
|
return fmt.Errorf("database not found in config: %s", path)
|
||||||
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
|
} else if db, err = NewDBFromConfig(dbc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
var db *litestream.DB
|
var db *litestream.DB
|
||||||
var r litestream.Replica
|
var r litestream.Replica
|
||||||
if isURL(fs.Arg(0)) {
|
if isURL(fs.Arg(0)) {
|
||||||
if r, err = NewReplicaFromURL(fs.Arg(0)); err != nil {
|
if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if configPath != "" {
|
} else if configPath != "" {
|
||||||
@@ -49,7 +49,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return err
|
return err
|
||||||
} else if dbc := config.DBConfig(path); dbc == nil {
|
} else if dbc := config.DBConfig(path); dbc == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", path)
|
return fmt.Errorf("database not found in config: %s", path)
|
||||||
} else if db, err = newDBFromConfig(&config, dbc); err != nil {
|
} else if db, err = NewDBFromConfig(dbc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
65
s3/s3.go
65
s3/s3.go
@@ -7,8 +7,10 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -72,6 +74,8 @@ type Replica struct {
|
|||||||
Region string
|
Region string
|
||||||
Bucket string
|
Bucket string
|
||||||
Path string
|
Path string
|
||||||
|
Endpoint string
|
||||||
|
ForcePathStyle bool
|
||||||
|
|
||||||
// Time between syncs with the shadow WAL.
|
// Time between syncs with the shadow WAL.
|
||||||
SyncInterval time.Duration
|
SyncInterval time.Duration
|
||||||
@@ -646,9 +650,11 @@ func (r *Replica) Init(ctx context.Context) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look up region if not specified.
|
// Look up region if not specified and no endpoint is used.
|
||||||
|
// Endpoints are typically used for non-S3 object stores and do not
|
||||||
|
// necessarily require a region.
|
||||||
region := r.Region
|
region := r.Region
|
||||||
if region == "" {
|
if region == "" && r.Endpoint == "" {
|
||||||
if region, err = r.findBucketRegion(ctx, r.Bucket); err != nil {
|
if region, err = r.findBucketRegion(ctx, r.Bucket); err != nil {
|
||||||
return fmt.Errorf("cannot lookup bucket region: %w", err)
|
return fmt.Errorf("cannot lookup bucket region: %w", err)
|
||||||
}
|
}
|
||||||
@@ -656,7 +662,9 @@ func (r *Replica) Init(ctx context.Context) (err error) {
|
|||||||
|
|
||||||
// Create new AWS session.
|
// Create new AWS session.
|
||||||
config := r.config()
|
config := r.config()
|
||||||
|
if region != "" {
|
||||||
config.Region = aws.String(region)
|
config.Region = aws.String(region)
|
||||||
|
}
|
||||||
sess, err := session.NewSession(config)
|
sess, err := session.NewSession(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot create aws session: %w", err)
|
return fmt.Errorf("cannot create aws session: %w", err)
|
||||||
@@ -673,6 +681,12 @@ func (r *Replica) config() *aws.Config {
|
|||||||
if r.AccessKeyID != "" || r.SecretAccessKey != "" {
|
if r.AccessKeyID != "" || r.SecretAccessKey != "" {
|
||||||
config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "")
|
config.Credentials = credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, "")
|
||||||
}
|
}
|
||||||
|
if r.Endpoint != "" {
|
||||||
|
config.Endpoint = aws.String(r.Endpoint)
|
||||||
|
}
|
||||||
|
if r.ForcePathStyle {
|
||||||
|
config.S3ForcePathStyle = aws.Bool(r.ForcePathStyle)
|
||||||
|
}
|
||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1027,6 +1041,53 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseHost extracts data from a hostname depending on the service provider.
|
||||||
|
func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool) {
|
||||||
|
// Extract port if one is specified.
|
||||||
|
host, port, err := net.SplitHostPort(s)
|
||||||
|
if err != nil {
|
||||||
|
host = s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default to path-based URLs, except for with AWS S3 itself.
|
||||||
|
forcePathStyle = true
|
||||||
|
|
||||||
|
// Extract fields from provider-specific host formats.
|
||||||
|
scheme := "https"
|
||||||
|
if a := localhostRegex.FindStringSubmatch(host); a != nil {
|
||||||
|
bucket, region = a[1], "us-east-1"
|
||||||
|
scheme, endpoint = "http", "localhost"
|
||||||
|
} else if a := gcsRegex.FindStringSubmatch(host); a != nil {
|
||||||
|
bucket, region = a[1], "us-east-1"
|
||||||
|
endpoint = "storage.googleapis.com"
|
||||||
|
} else if a := backblazeRegex.FindStringSubmatch(host); a != nil {
|
||||||
|
bucket = a[1]
|
||||||
|
region = a[2]
|
||||||
|
endpoint = fmt.Sprintf("s3.%s.backblazeb2.com", a[2])
|
||||||
|
} else {
|
||||||
|
bucket = host
|
||||||
|
forcePathStyle = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add port back to endpoint, if available.
|
||||||
|
if endpoint != "" && port != "" {
|
||||||
|
endpoint = net.JoinHostPort(endpoint, port)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepend scheme to endpoint.
|
||||||
|
if endpoint != "" {
|
||||||
|
endpoint = scheme + "://" + endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucket, region, endpoint, forcePathStyle
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
localhostRegex = regexp.MustCompile(`^(?:(.+)\.)?localhost$`)
|
||||||
|
backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`)
|
||||||
|
gcsRegex = regexp.MustCompile(`^(?:(.+)\.)?storage.googleapis.com$`)
|
||||||
|
)
|
||||||
|
|
||||||
// S3 metrics.
|
// S3 metrics.
|
||||||
var (
|
var (
|
||||||
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
|||||||
80
s3/s3_test.go
Normal file
80
s3/s3_test.go
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package s3_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/benbjohnson/litestream/s3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseHost(t *testing.T) {
|
||||||
|
// Ensure non-specific hosts return as buckets.
|
||||||
|
t.Run("S3", func(t *testing.T) {
|
||||||
|
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.litestream.io`)
|
||||||
|
if got, want := bucket, `test.litestream.io`; got != want {
|
||||||
|
t.Fatalf("bucket=%q, want %q", got, want)
|
||||||
|
} else if got, want := region, ``; got != want {
|
||||||
|
t.Fatalf("region=%q, want %q", got, want)
|
||||||
|
} else if got, want := endpoint, ``; got != want {
|
||||||
|
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||||
|
} else if got, want := forcePathStyle, false; got != want {
|
||||||
|
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure localhosts use an HTTP endpoint and extract the bucket name.
|
||||||
|
t.Run("Localhost", func(t *testing.T) {
|
||||||
|
t.Run("WithPort", func(t *testing.T) {
|
||||||
|
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost:9000`)
|
||||||
|
if got, want := bucket, `test`; got != want {
|
||||||
|
t.Fatalf("bucket=%q, want %q", got, want)
|
||||||
|
} else if got, want := region, `us-east-1`; got != want {
|
||||||
|
t.Fatalf("region=%q, want %q", got, want)
|
||||||
|
} else if got, want := endpoint, `http://localhost:9000`; got != want {
|
||||||
|
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||||
|
} else if got, want := forcePathStyle, true; got != want {
|
||||||
|
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("WithoutPort", func(t *testing.T) {
|
||||||
|
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test.localhost`)
|
||||||
|
if got, want := bucket, `test`; got != want {
|
||||||
|
t.Fatalf("bucket=%q, want %q", got, want)
|
||||||
|
} else if got, want := region, `us-east-1`; got != want {
|
||||||
|
t.Fatalf("region=%q, want %q", got, want)
|
||||||
|
} else if got, want := endpoint, `http://localhost`; got != want {
|
||||||
|
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||||
|
} else if got, want := forcePathStyle, true; got != want {
|
||||||
|
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure backblaze B2 URLs extract bucket, region, & endpoint from host.
|
||||||
|
t.Run("Backblaze", func(t *testing.T) {
|
||||||
|
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`test-123.s3.us-west-000.backblazeb2.com`)
|
||||||
|
if got, want := bucket, `test-123`; got != want {
|
||||||
|
t.Fatalf("bucket=%q, want %q", got, want)
|
||||||
|
} else if got, want := region, `us-west-000`; got != want {
|
||||||
|
t.Fatalf("region=%q, want %q", got, want)
|
||||||
|
} else if got, want := endpoint, `https://s3.us-west-000.backblazeb2.com`; got != want {
|
||||||
|
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||||
|
} else if got, want := forcePathStyle, true; got != want {
|
||||||
|
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Ensure GCS URLs extract bucket & endpoint from host.
|
||||||
|
t.Run("GCS", func(t *testing.T) {
|
||||||
|
bucket, region, endpoint, forcePathStyle := s3.ParseHost(`litestream.io.storage.googleapis.com`)
|
||||||
|
if got, want := bucket, `litestream.io`; got != want {
|
||||||
|
t.Fatalf("bucket=%q, want %q", got, want)
|
||||||
|
} else if got, want := region, `us-east-1`; got != want {
|
||||||
|
t.Fatalf("region=%q, want %q", got, want)
|
||||||
|
} else if got, want := endpoint, `https://storage.googleapis.com`; got != want {
|
||||||
|
t.Fatalf("endpoint=%q, want %q", got, want)
|
||||||
|
} else if got, want := forcePathStyle, true; got != want {
|
||||||
|
t.Fatalf("forcePathStyle=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user