From a4e66eb8d82d535faae38c6672a7a556bd6daccd Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 18 Dec 2020 13:21:29 -0700 Subject: [PATCH] Change config format to yaml; add replicators --- cmd/litestream/config.go | 17 +++++++-- cmd/litestream/main.go | 80 +++++++++++++++++++++++++++++++++------- db.go | 8 ++-- go.mod | 1 + go.sum | 3 ++ replicator.go | 18 +++++++++ 6 files changed, 107 insertions(+), 20 deletions(-) create mode 100644 replicator.go diff --git a/cmd/litestream/config.go b/cmd/litestream/config.go index e21960c..ed3ea62 100644 --- a/cmd/litestream/config.go +++ b/cmd/litestream/config.go @@ -1,16 +1,19 @@ package main import ( + "fmt" "io/ioutil" "os" + "os/user" + "path/filepath" "strings" - "github.com/pelletier/go-toml" + "gopkg.in/yaml.v2" ) // Config represents a configuration file for the litestream CLI. type Config struct { - DBs []DBConfig `toml:"db"` + DBs []*DBConfig `yaml:"databases"` } // DefaultConfig returns a new instance of Config with defaults set. @@ -38,12 +41,18 @@ func ReadConfigFile(filename string) (Config, error) { return config, fmt.Errorf("config file not found: %s", filename) } else if err != nil { return config, err - } else if err := toml.Unmarshal(buf, &config); err != nil { + } else if err := yaml.Unmarshal(buf, &config); err != nil { return config, err } return config, nil } type DBConfig struct { - Path string `toml:"path"` + Path string `yaml:"path"` + Replicators []*ReplicatorConfig `yaml:"replicators` +} + +type ReplicatorConfig struct { + Type string `yaml:"type"` // "file", "s3" + Path string `yaml:"path"` // used for file replicators } diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 5bace17..af6aafa 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -2,19 +2,25 @@ package main import ( "context" - "encoding/json" "errors" "flag" "fmt" - "io/ioutil" - "log" "os" "os/signal" - "path/filepath" "github.com/benbjohnson/litestream" ) +// Build information. +var ( + Version = "(development build)" +) + +// Default settings. +const ( + DefaultConfigPath = "~/litestream.yml" +) + func main() { // Setup signal handler. ctx, cancel := context.WithCancel(context.Background()) @@ -24,19 +30,26 @@ func main() { // Initialize program and read flags/config. m := NewMain() - if err := m.ParseFlags(os.Args[1:]); err == flag.ErrHelp { + if err := m.ParseFlags(ctx, os.Args[1:]); err == flag.ErrHelp { os.Exit(1) } else if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } + // Display version information. + fmt.Printf("Litestream %s\n", Version) + // Start monitoring databases. - if err := m.Run(); err != nil { + if err := m.Run(ctx); err != nil { + m.Close() fmt.Fprintln(os.Stderr, err) os.Exit(1) } + // Notify user that initialization is done. + fmt.Printf("Initialized with %d databases; replication initialized.\n", len(m.DBs)) + // Wait for signal to stop program. <-ctx.Done() signal.Reset() @@ -51,6 +64,9 @@ func main() { type Main struct { ConfigPath string Config Config + + // List of managed databases specified in the config. + DBs []*litestream.DB } func NewMain() *Main { @@ -60,8 +76,8 @@ func NewMain() *Main { // ParseFlags parses the flag set from args & loads the configuration. func (m *Main) ParseFlags(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream", flag.ContinueOnError) - fs.StringVar(&m.ConfigPath, "config", "", "configuration path") - fs.Usage = m.usage + fs.StringVar(&m.ConfigPath, "config", DefaultConfigPath, "configuration path") + fs.Usage = m.Usage if err := fs.Parse(args); err != nil { return err } @@ -83,22 +99,60 @@ func (m *Main) Run(ctx context.Context) (err error) { } for _, dbc := range m.Config.DBs { - db := litestream.NewDB() - db.Path = dbc.Path - if err := db.Open(); err != nil { + if err := m.openDB(dbc); err != nil { return err } - m.DBs = append(m.DBs, db) } return nil } +// openDB instantiates and initializes a DB based on a configuration. +func (m *Main) openDB(config *DBConfig) error { + // Initialize database with given path. + db := litestream.NewDB(config.Path) + + // Instantiate and attach replicators. + for _, rconfig := range config.Replicators { + r, err := m.createReplicator(db, rconfig) + if err != nil { + return err + } + db.Replicators = append(db.Replicators, r) + } + + // Open database & attach to program. + if err := db.Open(); err != nil { + return err + } + m.DBs = append(m.DBs, db) + + return nil +} + +// createReplicator instantiates a replicator for a DB based on a config. +func (m *Main) createReplicator(db *litestream.DB, config *ReplicatorConfig) (litestream.Replicator, error) { + switch config.Type { + case "", "file": + return m.createFileReplicator(db, config) + default: + return nil, fmt.Errorf("unknown replicator type in config: %q", config.Type) + } +} + +// createFileReplicator returns a new instance of FileReplicator build from config. +func (m *Main) createFileReplicator(db *litestream.DB, config *ReplicatorConfig) (*litestream.FileReplicator, error) { + if config.Path == "" { + return nil, fmt.Errorf("file replicator path require for db %q", db.Path()) + } + return litestream.NewFileReplicator(db, config.Path), nil +} + // Close closes all open databases. func (m *Main) Close() (err error) { for _, db := range m.DBs { if e := db.Close(); e != nil { - log.Printf("error closing db: path=%s err=%s", db.Path, e) + fmt.Printf("error closing db: path=%s err=%s\n", db.Path(), e) if err == nil { err = e } diff --git a/db.go b/db.go index 67003b5..a36d2cd 100644 --- a/db.go +++ b/db.go @@ -1,16 +1,14 @@ package litestream import ( - "bytes" "context" "database/sql" "fmt" - "io" "log" "os" "path/filepath" - "strings" "sync" + "time" ) const ( @@ -34,6 +32,10 @@ type DB struct { cancel func() wg sync.WaitGroup + // List of replicators for the database. + // Must be set before calling Open(). + Replicators []Replicator + // Frequency at which to perform db sync. MonitorInterval time.Duration } diff --git a/go.mod b/go.mod index b2d3f26..1ca941e 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.15 require ( github.com/mattn/go-sqlite3 v1.14.5 github.com/pelletier/go-toml v1.8.1 + gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 52dd6b6..32126b1 100644 --- a/go.sum +++ b/go.sum @@ -33,3 +33,6 @@ golang.org/x/tools v0.0.0-20200423201157-2723c5de0d66/go.mod h1:EkVYQZoAsY45+roY golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/replicator.go b/replicator.go new file mode 100644 index 0000000..36d382c --- /dev/null +++ b/replicator.go @@ -0,0 +1,18 @@ +package litestream + +type Replicator interface { +} + +// FileReplicator is a replicator that replicates a DB to a local file path. +type FileReplicator struct { + db *DB // source database + dst string // destination path +} + +// NewFileReplicator returns a new instance of FileReplicator. +func NewFileReplicator(db *DB, dst string) *FileReplicator { + return &FileReplicator{ + db: db, + dst: dst, + } +}