From 43dda4315f40be3f3befbff0ecbcb92d5ce9aa12 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 15 Jan 2021 12:04:23 -0700 Subject: [PATCH] Allow URLs for replica config path --- cmd/litestream/main.go | 64 +++++++++++++++++++++++++++++++++++++ cmd/litestream/replicate.go | 17 ++++++++-- replica.go | 5 +++ 3 files changed, 84 insertions(+), 2 deletions(-) diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index aa35740..80b9003 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -6,8 +6,10 @@ import ( "fmt" "io/ioutil" "log" + "net/url" "os" "os/user" + "path" "path/filepath" "strings" "time" @@ -106,6 +108,15 @@ type Config struct { DBs []*DBConfig `yaml:"dbs"` } +func (c *Config) Normalize() error { + for i := range c.DBs { + if err := c.DBs[i].Normalize(); err != nil { + return err + } + } + return nil +} + // DefaultConfig returns a new instance of Config with defaults set. func DefaultConfig() Config { return Config{ @@ -145,6 +156,10 @@ func ReadConfigFile(filename string) (Config, error) { } else if err := yaml.Unmarshal(buf, &config); err != nil { return config, err } + + if err := config.Normalize(); err != nil { + return config, err + } return config, nil } @@ -153,6 +168,15 @@ type DBConfig struct { Replicas []*ReplicaConfig `yaml:"replicas"` } +func (c *DBConfig) Normalize() error { + for i := range c.Replicas { + if err := c.Replicas[i].Normalize(); err != nil { + return err + } + } + return nil +} + type ReplicaConfig struct { Type string `yaml:"type"` // "file", "s3" Name string `yaml:"name"` // name of replica, optional. @@ -168,6 +192,46 @@ type ReplicaConfig struct { Bucket string `yaml:"bucket"` } +func (c *ReplicaConfig) Normalize() error { + // Expand path filename, if necessary. + if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(c.Path, prefix) { + u, err := user.Current() + if err != nil { + return err + } else if u.HomeDir == "" { + return fmt.Errorf("cannot expand replica path, no home directory available") + } + c.Path = filepath.Join(u.HomeDir, strings.TrimPrefix(c.Path, prefix)) + } + + // Attempt to parse as URL. Ignore if it is not a URL or if there is no scheme. + u, err := url.Parse(c.Path) + if err != nil || u.Scheme == "" { + return nil + } + + switch u.Scheme { + case "file": + u.Scheme = "" + c.Type = u.Scheme + c.Path = path.Clean(u.String()) + return nil + + case "s3": + c.Type = u.Scheme + c.Path = strings.TrimPrefix(path.Clean(u.Path), "/") + c.Bucket = u.Host + if u := u.User; u != nil { + c.AccessKeyID = u.Username() + c.SecretAccessKey, _ = u.Password() + } + return nil + + default: + return fmt.Errorf("unrecognized replica type in path scheme: %s", c.Path) + } +} + // DefaultConfigPath returns the default config path. func DefaultConfigPath() string { if v := os.Getenv("LITESTREAM_CONFIG"); v != "" { diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index f47c2d7..ece0139 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -13,6 +13,7 @@ import ( "os/signal" "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/s3" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -75,12 +76,24 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { } // Notify user that initialization is done. - fmt.Printf("Initialized with %d databases.\n", len(c.DBs)) + for _, db := range c.DBs { + fmt.Printf("initialized db: %s\n", db.Path()) + for _, r := range db.Replicas { + switch r := r.(type) { + case *litestream.FileReplica: + fmt.Printf("replicating to: name=%q type=%q path=%q\n", r.Name(), r.Type(), r.Path()) + case *s3.Replica: + fmt.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q\n", r.Name(), r.Type(), r.Bucket, r.Path, r.Region) + default: + fmt.Printf("replicating to: name=%q type=%q\n", r.Name(), r.Type()) + } + } + } // Serve metrics over HTTP if enabled. if config.Addr != "" { _, port, _ := net.SplitHostPort(config.Addr) - fmt.Printf("Serving metrics on http://localhost:%s/metrics\n", port) + fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port) go func() { http.Handle("/metrics", promhttp.Handler()) http.ListenAndServe(config.Addr, nil) diff --git a/replica.go b/replica.go index a3e26a9..58df78a 100644 --- a/replica.go +++ b/replica.go @@ -141,6 +141,11 @@ func (r *FileReplica) Type() string { return "file" } +// Path returns the path the replica was initialized with. +func (r *FileReplica) Path() string { + return r.dst +} + // LastPos returns the last successfully replicated position. func (r *FileReplica) LastPos() Pos { r.mu.RLock()