From ffc25e26540b09dbf1fd9ff213685885fe462b01 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 30 Dec 2020 15:31:12 -0700 Subject: [PATCH] Add 'snapshots' command. --- cmd/litestream/main.go | 5 +- cmd/litestream/snapshots.go | 121 ++++++++++++++++++++++++++++++++++++ db.go | 29 +++++++++ litestream.go | 10 +++ replicator.go | 46 +++++++++++++- 5 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 cmd/litestream/snapshots.go diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 621f971..4c1b7bb 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -54,6 +54,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { return (&ReplicateCommand{}).Run(ctx, args) case "restore": return (&RestoreCommand{}).Run(ctx, args) + case "snapshots": + return (&SnapshotsCommand{}).Run(ctx, args) case "version": return (&VersionCommand{}).Run(ctx, args) default: @@ -75,9 +77,10 @@ Usage: The commands are: - generations list available generations across all dbs & replicas + generations list available generations for a database replicate runs a server to replicate databases restore recovers database backup from a replica + snapshots list available snapshots for a database version prints the version `[1:]) } diff --git a/cmd/litestream/snapshots.go b/cmd/litestream/snapshots.go new file mode 100644 index 0000000..24ddc3c --- /dev/null +++ b/cmd/litestream/snapshots.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "os" + "path/filepath" + "text/tabwriter" + "time" + + "github.com/benbjohnson/litestream" +) + +type SnapshotsCommand struct{} + +func NewSnapshotsCommand() *SnapshotsCommand { + return &SnapshotsCommand{} +} + +func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { + var configPath string + fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError) + registerConfigFlag(fs, &configPath) + replicaName := fs.String("replica", "", "replica name") + fs.Usage = c.Usage + if err := fs.Parse(args); err != nil { + return err + } else if fs.NArg() == 0 || fs.Arg(0) == "" { + return fmt.Errorf("database path required") + } else if fs.NArg() > 1 { + return fmt.Errorf("too many arguments") + } + + // Load configuration. + if configPath == "" { + return errors.New("-config required") + } + config, err := ReadConfigFile(configPath) + if err != nil { + return err + } + + // Determine absolute path for database. + dbPath, err := filepath.Abs(fs.Arg(0)) + if err != nil { + return err + } + + // Instantiate DB. + dbConfig := config.DBConfig(dbPath) + if dbConfig == nil { + return fmt.Errorf("database not found in config: %s", dbPath) + } + db, err := newDBFromConfig(dbConfig) + if err != nil { + return err + } + + // Find snapshots by db or replica. + var infos []*litestream.SnapshotInfo + if *replicaName != "" { + if r := db.Replica(*replicaName); r == nil { + return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath) + } else if infos, err = r.Snapshots(ctx); err != nil { + return err + } + } else { + if infos, err = db.Snapshots(ctx); err != nil { + return err + } + } + + // List all snapshots. + w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) + fmt.Fprintln(w, "replica\tname\tgeneration\tindex\tcreated") + for _, info := range infos { + fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\n", + info.Replica, + info.Name, + info.Generation, + info.Index, + info.CreatedAt.Format(time.RFC3339), + ) + } + w.Flush() + + return nil +} + +func (c *SnapshotsCommand) Usage() { + fmt.Printf(` +The snapshots command lists all snapshots available for a database. + +Usage: + + litestream snapshots [arguments] DB + +Arguments: + + -config PATH + Specifies the configuration file. + Defaults to %s + + -replica NAME + Optional, filter by a specific replica. + + +Examples: + + # List all snapshots for a database. + $ litestream snapshots /path/to/db + + # List all snapshots on S3. + $ litestream snapshots -replica s3 /path/to/db + +`[1:], + DefaultConfigPath, + ) +} diff --git a/db.go b/db.go index f042110..77126b4 100644 --- a/db.go +++ b/db.go @@ -14,6 +14,7 @@ import ( "math/rand" "os" "path/filepath" + "sort" "strings" "sync" "time" @@ -152,6 +153,16 @@ func (db *DB) CurrentShadowWALIndex(generation string) (int, error) { return index, nil } +// Replica returns a replica by name. +func (db *DB) Replica(name string) Replica { + for _, r := range db.Replicas { + if r.Name() == name { + return r + } + } + return nil +} + // Pos returns the current position of the database. func (db *DB) Pos() (Pos, error) { generation, err := db.CurrentGeneration() @@ -248,6 +259,24 @@ func (db *DB) UpdatedAt() (time.Time, error) { return t, nil } +// Snapshots returns a list of all snapshots across all replicas. +func (db *DB) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) { + var infos []*SnapshotInfo + for _, r := range db.Replicas { + a, err := r.Snapshots(ctx) + if err != nil { + return nil, err + } + infos = append(infos, a...) + } + + // Sort in order by time. + sort.Slice(infos, func(i, j int) bool { + return infos[i].CreatedAt.Before(infos[j].CreatedAt) + }) + return infos, nil +} + // Init initializes the connection to the database. // Skipped if already initialized or if the database file does not exist. func (db *DB) Init() (err error) { diff --git a/litestream.go b/litestream.go index d9783d5..074e955 100644 --- a/litestream.go +++ b/litestream.go @@ -13,6 +13,7 @@ import ( "regexp" "strconv" "strings" + "time" _ "github.com/mattn/go-sqlite3" ) @@ -40,6 +41,15 @@ var ( ErrNoSnapshots = errors.New("no snapshots available") ) +// SnapshotInfo represents file information about a snapshot. +type SnapshotInfo struct { + Name string + Replica string + Generation string + Index int + CreatedAt time.Time +} + // Pos is a position in the WAL for a generation. type Pos struct { Generation string // generation name diff --git a/replicator.go b/replicator.go index 99a2174..8bef437 100644 --- a/replicator.go +++ b/replicator.go @@ -29,6 +29,9 @@ type Replica interface { // Stops all replication processing. Blocks until processing stopped. Stop() + // Returns the last replication position. + Pos() Pos + // Returns a list of generation names for the replica. Generations(ctx context.Context) ([]string, error) @@ -36,8 +39,8 @@ type Replica interface { // snapshot & WAL files as well as the time range covered. GenerationStats(ctx context.Context, generation string) (GenerationStats, error) - // Returns the last replication position. - Pos() Pos + // Returns a list of available snapshots in the replica. + Snapshots(ctx context.Context) ([]*SnapshotInfo, error) // Returns the highest index for a snapshot within a generation that occurs // before timestamp. If timestamp is zero, returns the latest snapshot. @@ -301,6 +304,43 @@ func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, er return n, min, max, nil } +// Snapshots returns a list of available snapshots in the replica. +func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) { + generations, err := r.Generations(ctx) + if err != nil { + return nil, err + } + + var infos []*SnapshotInfo + for _, generation := range generations { + fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) + if os.IsNotExist(err) { + continue + } else if err != nil { + return nil, err + } + + for _, fi := range fis { + index, _, _, err := ParseSnapshotPath(fi.Name()) + if err != nil { + continue + } + + // TODO: Add schedule name to snapshot info. + + infos = append(infos, &SnapshotInfo{ + Name: fi.Name(), + Replica: r.Name(), + Generation: generation, + Index: index, + CreatedAt: fi.ModTime().UTC(), + }) + } + } + + return infos, nil +} + // Start starts replication for a given generation. func (r *FileReplica) Start(ctx context.Context) { // Stop previous replication. @@ -465,7 +505,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { if r.Pos().IsZero() { pos, err := r.calcPos(generation) if err != nil { - return fmt.Errorf("cannot determine replica position: %s", r.db.Path(), r.Name(), err) + return fmt.Errorf("cannot determine replica position: %s", err) } r.mu.Lock()