From 98014f4e4926a85e1ec0c113a2a756603811ae1e Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 26 Dec 2020 09:39:21 -0700 Subject: [PATCH] Add generations command --- cmd/litestream/config.go | 69 ----------------- cmd/litestream/generations.go | 137 ++++++++++++++++++++++++++++++++++ cmd/litestream/main.go | 97 ++++++++++++++++++++++++ cmd/litestream/replicate.go | 58 +++----------- db.go | 20 +++++ litestream.go | 28 +++++++ replicator.go | 121 ++++++++++++++++++++++++++++++ 7 files changed, 415 insertions(+), 115 deletions(-) delete mode 100644 cmd/litestream/config.go create mode 100644 cmd/litestream/generations.go diff --git a/cmd/litestream/config.go b/cmd/litestream/config.go deleted file mode 100644 index a473bc8..0000000 --- a/cmd/litestream/config.go +++ /dev/null @@ -1,69 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "io/ioutil" - "os" - "os/user" - "path/filepath" - "strings" - - "gopkg.in/yaml.v2" -) - -// Default settings. -const ( - DefaultConfigPath = "~/litestream.yml" -) - -// Config represents a configuration file for the litestream daemon. -type Config struct { - DBs []*DBConfig `yaml:"databases"` -} - -// DefaultConfig returns a new instance of Config with defaults set. -func DefaultConfig() Config { - return Config{} -} - -// ReadConfigFile unmarshals config from filename. Expands path if needed. -func ReadConfigFile(filename string) (Config, error) { - config := DefaultConfig() - - // Expand filename, if necessary. - if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) { - u, err := user.Current() - if err != nil { - return config, err - } else if u.HomeDir == "" { - return config, fmt.Errorf("home directory unset") - } - filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix)) - } - - // Read & deserialize configuration. - if buf, err := ioutil.ReadFile(filename); os.IsNotExist(err) { - return config, fmt.Errorf("config file not found: %s", filename) - } else if err != nil { - return config, err - } else if err := yaml.Unmarshal(buf, &config); err != nil { - return config, err - } - return config, nil -} - -type DBConfig struct { - Path string `yaml:"path"` - Replicas []*ReplicaConfig `yaml:"replicas"` -} - -type ReplicaConfig struct { - Type string `yaml:"type"` // "file", "s3" - Name string `yaml:"name"` // name of replicator, optional. - Path string `yaml:"path"` // used for file replicators -} - -func registerConfigFlag(fs *flag.FlagSet, p *string) { - fs.StringVar(p, "config", DefaultConfigPath, "config path") -} diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go new file mode 100644 index 0000000..b83fcbb --- /dev/null +++ b/cmd/litestream/generations.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "os" + "path/filepath" + "text/tabwriter" + "time" +) + +type GenerationsCommand struct { + ConfigPath string + Config Config + + DBPath string +} + +func NewGenerationsCommand() *GenerationsCommand { + return &GenerationsCommand{} +} + +func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) { + fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError) + registerConfigFlag(fs, &c.ConfigPath) + fs.Usage = c.Usage + if err := fs.Parse(args); err != nil { + return err + } else if fs.NArg() > 1 { + return fmt.Errorf("too many arguments") + } + + // Load configuration. + if c.ConfigPath == "" { + return errors.New("-config required") + } + config, err := ReadConfigFile(c.ConfigPath) + if err != nil { + return err + } + + // Determine absolute path for database, if specified. + if c.DBPath = fs.Arg(0); c.DBPath != "" { + if c.DBPath, err = filepath.Abs(c.DBPath); err != nil { + return err + } + } + + // List each generation. + w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) + fmt.Fprintln(w, "db\tname\tgeneration\tlag\tstart\tend") + for _, dbConfig := range config.DBs { + // Filter database, if specified in the arguments. + if c.DBPath != "" && dbConfig.Path != c.DBPath { + continue + } + + // Instantiate DB from from configuration. + db, err := newDBFromConfig(dbConfig) + if err != nil { + return err + } + + // Determine last time database or WAL was updated. + updatedAt, err := db.UpdatedAt() + if err != nil { + return err + } + + // Iterate over each replicator in the database. + for _, r := range db.Replicators { + generations, err := r.Generations() + if err != nil { + log.Printf("%s: cannot list generations", r.Name(), err) + continue + } + + // Iterate over each generation for the replicator. + for _, generation := range generations { + stats, err := r.GenerationStats(generation) + if err != nil { + log.Printf("%s: cannot find generation stats: %s", r.Name(), err) + continue + } + + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", + db.Path(), + r.Name(), + generation, + truncateDuration(stats.UpdatedAt.Sub(updatedAt)).String(), + stats.CreatedAt.Format(time.RFC3339), + stats.UpdatedAt.Format(time.RFC3339), + ) + } + } + } + w.Flush() + + return nil +} + +func (c *GenerationsCommand) Usage() { + fmt.Printf(` +The generations command lists all generations across all replicas along with +stats about their lag behind the primary database and the time range they cover. + +Usage: + + litestream generations [arguments] DB + +Arguments: + + -config PATH + Specifies the configuration file. Defaults to %s + +`[1:], + DefaultConfigPath, + ) +} + +func truncateDuration(d time.Duration) time.Duration { + if d > time.Hour { + return d.Truncate(time.Hour) + } else if d > time.Minute { + return d.Truncate(time.Minute) + } else if d > time.Second { + return d.Truncate(time.Second) + } else if d > time.Millisecond { + return d.Truncate(time.Millisecond) + } else if d > time.Microsecond { + return d.Truncate(time.Microsecond) + } + return d +} diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index c5f8b27..81068bb 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -4,9 +4,15 @@ import ( "context" "flag" "fmt" + "io/ioutil" "log" "os" + "os/user" + "path/filepath" "strings" + + "github.com/benbjohnson/litestream" + "gopkg.in/yaml.v2" ) // Build information. @@ -14,6 +20,9 @@ var ( Version = "(development build)" ) +// DefaultConfigPath is the default configuration path. +const DefaultConfigPath = "/etc/litestream.yml" + func main() { log.SetFlags(0) @@ -39,6 +48,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { } switch cmd { + case "generations": + return (&GenerationsCommand{}).Run(ctx, args) case "replicate": return (&ReplicateCommand{}).Run(ctx, args) case "version": @@ -66,3 +77,89 @@ The commands are: version prints the version `[1:]) } + +// Config represents a configuration file for the litestream daemon. +type Config struct { + DBs []*DBConfig `yaml:"databases"` +} + +// DefaultConfig returns a new instance of Config with defaults set. +func DefaultConfig() Config { + return Config{} +} + +// ReadConfigFile unmarshals config from filename. Expands path if needed. +func ReadConfigFile(filename string) (Config, error) { + config := DefaultConfig() + + // Expand filename, if necessary. + if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) { + u, err := user.Current() + if err != nil { + return config, err + } else if u.HomeDir == "" { + return config, fmt.Errorf("home directory unset") + } + filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix)) + } + + // Read & deserialize configuration. + if buf, err := ioutil.ReadFile(filename); os.IsNotExist(err) { + return config, fmt.Errorf("config file not found: %s", filename) + } else if err != nil { + return config, err + } else if err := yaml.Unmarshal(buf, &config); err != nil { + return config, err + } + return config, nil +} + +type DBConfig struct { + Path string `yaml:"path"` + Replicas []*ReplicaConfig `yaml:"replicas"` +} + +type ReplicaConfig struct { + Type string `yaml:"type"` // "file", "s3" + Name string `yaml:"name"` // name of replicator, optional. + Path string `yaml:"path"` // used for file replicators +} + +func registerConfigFlag(fs *flag.FlagSet, p *string) { + fs.StringVar(p, "config", DefaultConfigPath, "config path") +} + +// newDBFromConfig instantiates a DB based on a configuration. +func newDBFromConfig(config *DBConfig) (*litestream.DB, error) { + // Initialize database with given path. + db := litestream.NewDB(config.Path) + + // Instantiate and attach replicators. + for _, rconfig := range config.Replicas { + r, err := newReplicatorFromConfig(db, rconfig) + if err != nil { + return nil, err + } + db.Replicators = append(db.Replicators, r) + } + + return db, nil +} + +// newReplicatorFromConfig instantiates a replicator for a DB based on a config. +func newReplicatorFromConfig(db *litestream.DB, config *ReplicaConfig) (litestream.Replicator, error) { + switch config.Type { + case "", "file": + return newFileReplicatorFromConfig(db, config) + default: + return nil, fmt.Errorf("unknown replicator type in config: %q", config.Type) + } +} + +// newFileReplicatorFromConfig returns a new instance of FileReplicator build from config. +func newFileReplicatorFromConfig(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplicator, error) { + if config.Path == "" { + return nil, fmt.Errorf("file replicator path require for db %q", db.Path()) + } + return litestream.NewFileReplicator(db, config.Name, config.Path), nil +} diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 76b3d60..361e5c9 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -54,10 +54,17 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { return errors.New("configuration must specify at least one database") } - for _, dbc := range config.DBs { - if err := c.openDB(dbc); err != nil { + for _, dbConfig := range config.DBs { + db, err := newDBFromConfig(dbConfig) + if err != nil { return err } + + // Open database & attach to program. + if err := db.Open(); err != nil { + return err + } + c.DBs = append(c.DBs, db) } // Notify user that initialization is done. @@ -76,47 +83,6 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { return nil } -// openDB instantiates and initializes a DB based on a configuration. -func (c *ReplicateCommand) openDB(config *DBConfig) error { - // Initialize database with given path. - db := litestream.NewDB(config.Path) - - // Instantiate and attach replicators. - for _, rconfig := range config.Replicas { - r, err := c.createReplicator(db, rconfig) - if err != nil { - return err - } - db.Replicators = append(db.Replicators, r) - } - - // Open database & attach to program. - if err := db.Open(); err != nil { - return err - } - c.DBs = append(c.DBs, db) - - return nil -} - -// createReplicator instantiates a replicator for a DB based on a config. -func (c *ReplicateCommand) createReplicator(db *litestream.DB, config *ReplicaConfig) (litestream.Replicator, error) { - switch config.Type { - case "", "file": - return c.createFileReplicator(db, config) - default: - return nil, fmt.Errorf("unknown replicator type in config: %q", config.Type) - } -} - -// createFileReplicator returns a new instance of FileReplicator build from config. -func (c *ReplicateCommand) createFileReplicator(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplicator, error) { - if config.Path == "" { - return nil, fmt.Errorf("file replicator path require for db %q", db.Path()) - } - return litestream.NewFileReplicator(db, config.Name, config.Path), nil -} - // Close closes all open databases. func (c *ReplicateCommand) Close() (err error) { for _, db := range c.DBs { @@ -131,7 +97,7 @@ func (c *ReplicateCommand) Close() (err error) { } func (c *ReplicateCommand) Usage() { - fmt.Println(` + fmt.Printf(` The replicate command starts a server to monitor & replicate databases specified in your configuration file. @@ -142,7 +108,7 @@ Usage: Arguments: -config PATH - Specifies the configuration file. Defaults to ~/litestream.yml + Specifies the configuration file. Defaults to %s -`[1:]) +`[1:], DefaultConfigPath) } diff --git a/db.go b/db.go index cfc848c..78b90f2 100644 --- a/db.go +++ b/db.go @@ -190,6 +190,26 @@ func (db *DB) Close() (err error) { return err } +// UpdatedAt returns the last modified time of the database or WAL file. +func (db *DB) UpdatedAt() (time.Time, error) { + // Determine database modified time. + fi, err := os.Stat(db.Path()) + if err != nil { + return time.Time{}, err + } + t := fi.ModTime().UTC() + + // Use WAL modified time, if available & later. + if fi, err := os.Stat(db.WALPath()); os.IsNotExist(err) { + return t, nil + } else if err != nil { + return t, err + } else if fi.ModTime().After(t) { + t = fi.ModTime().UTC() + } + return t, nil +} + // Init initializes the connection to the database. // Skipped if already initialized or if the database file does not exist. func (db *DB) Init() (err error) { diff --git a/litestream.go b/litestream.go index 44e8b3d..ed9f3d7 100644 --- a/litestream.go +++ b/litestream.go @@ -138,6 +138,34 @@ func removeTmpFiles(root string) error { }) } +// IsGenerationName returns true if s is the correct length and is only lowercase hex characters. +func IsGenerationName(s string) bool { + if len(s) != GenerationNameLen { + return false + } + for _, ch := range s { + if !isHexChar(ch) { + return false + } + } + return true +} + +// IsSnapshotPath returns true if s is a path to a snapshot file. +func IsSnapshotPath(s string) bool { + return strings.HasSuffix(s, SnapshotExt) || strings.HasSuffix(s, SnapshotExt+".gz") +} + +// IsWALPath returns true if s is a path to a WAL file. +func IsWALPath(s string) bool { + return strings.HasSuffix(s, WALExt) || strings.HasSuffix(s, WALExt+".gz") +} + +// isHexChar returns true if ch is a lowercase hex character. +func isHexChar(ch rune) bool { + return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') +} + // HexDump returns hexdump output but with duplicate lines removed. func HexDump(b []byte) string { const prefixN = len("00000000") diff --git a/replicator.go b/replicator.go index 2f4dabc..1357629 100644 --- a/replicator.go +++ b/replicator.go @@ -12,14 +12,29 @@ import ( "sort" "strings" "sync" + "time" ) // Replicator represents a method for replicating the snapshot & WAL data to // a remote destination. type Replicator interface { + // The name of the replicator. Defaults to type if no name specified. Name() string + + // String identifier for the type of replicator ("file", "s3", etc). Type() string + + // Returns a list of generation names for the replicator. + Generations() ([]string, error) + + // Returns basic information about a generation including the number of + // snapshot & WAL files as well as the time range covered. + GenerationStats(generation string) (GenerationStats, error) + + // Starts replicating in a background goroutine. Start(ctx context.Context) + + // Stops all replication processing. Blocks until processing stopped. Stop() } @@ -71,6 +86,112 @@ func (r *FileReplicator) WALPath(generation string, index int) string { return filepath.Join(r.dst, "generations", generation, "wal", fmt.Sprintf("%016x.wal", index)) } +// Generations returns a list of available generation names. +func (r *FileReplicator) Generations() ([]string, error) { + fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations")) + if os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + var generations []string + for _, fi := range fis { + if !IsGenerationName(fi.Name()) { + continue + } else if !fi.IsDir() { + continue + } + generations = append(generations, fi.Name()) + } + return generations, nil +} + +// GenerationStats returns stats for a generation. +func (r *FileReplicator) GenerationStats(generation string) (stats GenerationStats, err error) { + // Determine stats for all snapshots. + n, min, max, err := r.snapshotStats(generation) + if err != nil { + return stats, err + } + stats.SnapshotN = n + stats.CreatedAt, stats.UpdatedAt = min, max + + // Update stats if we have WAL files. + n, min, max, err = r.walStats(generation) + if err != nil { + return stats, err + } else if n == 0 { + return stats, nil + } + + stats.WALN = n + if stats.CreatedAt.IsZero() || min.Before(stats.CreatedAt) { + stats.CreatedAt = min + } + if stats.UpdatedAt.IsZero() || max.After(stats.UpdatedAt) { + stats.UpdatedAt = max + } + return stats, nil +} + +func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time.Time, err error) { + fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "snapshots")) + if os.IsNotExist(err) { + return n, min, max, nil + } else if err != nil { + return n, min, max, err + } + + for _, fi := range fis { + if !IsSnapshotPath(fi.Name()) { + continue + } + modTime := fi.ModTime().UTC() + + n++ + if min.IsZero() || modTime.Before(min) { + min = modTime + } + if max.IsZero() || modTime.After(max) { + max = modTime + } + } + return n, min, max, nil +} + +func (r *FileReplicator) walStats(generation string) (n int, min, max time.Time, err error) { + fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "wal")) + if os.IsNotExist(err) { + return n, min, max, nil + } else if err != nil { + return n, min, max, err + } + + for _, fi := range fis { + if !IsWALPath(fi.Name()) { + continue + } + modTime := fi.ModTime().UTC() + + n++ + if min.IsZero() || modTime.Before(min) { + min = modTime + } + if max.IsZero() || modTime.After(max) { + max = modTime + } + } + return n, min, max, nil +} + +type GenerationStats struct { + SnapshotN int + WALN int + CreatedAt time.Time + UpdatedAt time.Time +} + // Start starts replication for a given generation. func (r *FileReplicator) Start(ctx context.Context) { // Stop previous replication.