Allow URLs for replica config path
This commit is contained in:
@@ -6,8 +6,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/user"
|
"os/user"
|
||||||
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -106,6 +108,15 @@ type Config struct {
|
|||||||
DBs []*DBConfig `yaml:"dbs"`
|
DBs []*DBConfig `yaml:"dbs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Config) Normalize() error {
|
||||||
|
for i := range c.DBs {
|
||||||
|
if err := c.DBs[i].Normalize(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// DefaultConfig returns a new instance of Config with defaults set.
|
// DefaultConfig returns a new instance of Config with defaults set.
|
||||||
func DefaultConfig() Config {
|
func DefaultConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
@@ -145,6 +156,10 @@ func ReadConfigFile(filename string) (Config, error) {
|
|||||||
} else if err := yaml.Unmarshal(buf, &config); err != nil {
|
} else if err := yaml.Unmarshal(buf, &config); err != nil {
|
||||||
return config, err
|
return config, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := config.Normalize(); err != nil {
|
||||||
|
return config, err
|
||||||
|
}
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,6 +168,15 @@ type DBConfig struct {
|
|||||||
Replicas []*ReplicaConfig `yaml:"replicas"`
|
Replicas []*ReplicaConfig `yaml:"replicas"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *DBConfig) Normalize() error {
|
||||||
|
for i := range c.Replicas {
|
||||||
|
if err := c.Replicas[i].Normalize(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type ReplicaConfig struct {
|
type ReplicaConfig struct {
|
||||||
Type string `yaml:"type"` // "file", "s3"
|
Type string `yaml:"type"` // "file", "s3"
|
||||||
Name string `yaml:"name"` // name of replica, optional.
|
Name string `yaml:"name"` // name of replica, optional.
|
||||||
@@ -168,6 +192,46 @@ type ReplicaConfig struct {
|
|||||||
Bucket string `yaml:"bucket"`
|
Bucket string `yaml:"bucket"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ReplicaConfig) Normalize() error {
|
||||||
|
// Expand path filename, if necessary.
|
||||||
|
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(c.Path, prefix) {
|
||||||
|
u, err := user.Current()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if u.HomeDir == "" {
|
||||||
|
return fmt.Errorf("cannot expand replica path, no home directory available")
|
||||||
|
}
|
||||||
|
c.Path = filepath.Join(u.HomeDir, strings.TrimPrefix(c.Path, prefix))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to parse as URL. Ignore if it is not a URL or if there is no scheme.
|
||||||
|
u, err := url.Parse(c.Path)
|
||||||
|
if err != nil || u.Scheme == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch u.Scheme {
|
||||||
|
case "file":
|
||||||
|
u.Scheme = ""
|
||||||
|
c.Type = u.Scheme
|
||||||
|
c.Path = path.Clean(u.String())
|
||||||
|
return nil
|
||||||
|
|
||||||
|
case "s3":
|
||||||
|
c.Type = u.Scheme
|
||||||
|
c.Path = strings.TrimPrefix(path.Clean(u.Path), "/")
|
||||||
|
c.Bucket = u.Host
|
||||||
|
if u := u.User; u != nil {
|
||||||
|
c.AccessKeyID = u.Username()
|
||||||
|
c.SecretAccessKey, _ = u.Password()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unrecognized replica type in path scheme: %s", c.Path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// DefaultConfigPath returns the default config path.
|
// DefaultConfigPath returns the default config path.
|
||||||
func DefaultConfigPath() string {
|
func DefaultConfigPath() string {
|
||||||
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
|
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
|
"github.com/benbjohnson/litestream/s3"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -75,12 +76,24 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Notify user that initialization is done.
|
// Notify user that initialization is done.
|
||||||
fmt.Printf("Initialized with %d databases.\n", len(c.DBs))
|
for _, db := range c.DBs {
|
||||||
|
fmt.Printf("initialized db: %s\n", db.Path())
|
||||||
|
for _, r := range db.Replicas {
|
||||||
|
switch r := r.(type) {
|
||||||
|
case *litestream.FileReplica:
|
||||||
|
fmt.Printf("replicating to: name=%q type=%q path=%q\n", r.Name(), r.Type(), r.Path())
|
||||||
|
case *s3.Replica:
|
||||||
|
fmt.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q\n", r.Name(), r.Type(), r.Bucket, r.Path, r.Region)
|
||||||
|
default:
|
||||||
|
fmt.Printf("replicating to: name=%q type=%q\n", r.Name(), r.Type())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Serve metrics over HTTP if enabled.
|
// Serve metrics over HTTP if enabled.
|
||||||
if config.Addr != "" {
|
if config.Addr != "" {
|
||||||
_, port, _ := net.SplitHostPort(config.Addr)
|
_, port, _ := net.SplitHostPort(config.Addr)
|
||||||
fmt.Printf("Serving metrics on http://localhost:%s/metrics\n", port)
|
fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port)
|
||||||
go func() {
|
go func() {
|
||||||
http.Handle("/metrics", promhttp.Handler())
|
http.Handle("/metrics", promhttp.Handler())
|
||||||
http.ListenAndServe(config.Addr, nil)
|
http.ListenAndServe(config.Addr, nil)
|
||||||
|
|||||||
@@ -141,6 +141,11 @@ func (r *FileReplica) Type() string {
|
|||||||
return "file"
|
return "file"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Path returns the path the replica was initialized with.
|
||||||
|
func (r *FileReplica) Path() string {
|
||||||
|
return r.dst
|
||||||
|
}
|
||||||
|
|
||||||
// LastPos returns the last successfully replicated position.
|
// LastPos returns the last successfully replicated position.
|
||||||
func (r *FileReplica) LastPos() Pos {
|
func (r *FileReplica) LastPos() Pos {
|
||||||
r.mu.RLock()
|
r.mu.RLock()
|
||||||
|
|||||||
Reference in New Issue
Block a user