From 11d7d22383361aeb6d2d106c30e5f13b2859aa7e Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 30 Dec 2020 16:03:23 -0700 Subject: [PATCH] Add 'wal' command --- cmd/litestream/main.go | 3 + cmd/litestream/snapshots.go | 6 +- cmd/litestream/wal.go | 130 ++++++++++++++++++++++++++++++++++++ db.go | 18 +++++ litestream.go | 12 ++++ replicator.go | 59 ++++++++++++++++ 6 files changed, 225 insertions(+), 3 deletions(-) create mode 100644 cmd/litestream/wal.go diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 4c1b7bb..7af31e3 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -58,6 +58,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { return (&SnapshotsCommand{}).Run(ctx, args) case "version": return (&VersionCommand{}).Run(ctx, args) + case "wal": + return (&WALCommand{}).Run(ctx, args) default: if cmd == "" || cmd == "help" || strings.HasPrefix(cmd, "-") { m.Usage() @@ -82,6 +84,7 @@ The commands are: restore recovers database backup from a replica snapshots list available snapshots for a database version prints the version + wal list available WAL files for a database `[1:]) } diff --git a/cmd/litestream/snapshots.go b/cmd/litestream/snapshots.go index 24ddc3c..b1f5dbc 100644 --- a/cmd/litestream/snapshots.go +++ b/cmd/litestream/snapshots.go @@ -74,13 +74,13 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { // List all snapshots. w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) - fmt.Fprintln(w, "replica\tname\tgeneration\tindex\tcreated") + fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated") for _, info := range infos { - fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\n", + fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n", info.Replica, - info.Name, info.Generation, info.Index, + info.Size, info.CreatedAt.Format(time.RFC3339), ) } diff --git a/cmd/litestream/wal.go b/cmd/litestream/wal.go new file mode 100644 index 0000000..07d6b3b --- /dev/null +++ b/cmd/litestream/wal.go @@ -0,0 +1,130 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "os" + "path/filepath" + "text/tabwriter" + "time" + + "github.com/benbjohnson/litestream" +) + +type WALCommand struct{} + +func NewWALCommand() *WALCommand { + return &WALCommand{} +} + +func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { + var configPath string + fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError) + registerConfigFlag(fs, &configPath) + replicaName := fs.String("replica", "", "replica name") + generation := fs.String("generation", "", "generation name") + fs.Usage = c.Usage + if err := fs.Parse(args); err != nil { + return err + } else if fs.NArg() == 0 || fs.Arg(0) == "" { + return fmt.Errorf("database path required") + } else if fs.NArg() > 1 { + return fmt.Errorf("too many arguments") + } + + // Load configuration. + if configPath == "" { + return errors.New("-config required") + } + config, err := ReadConfigFile(configPath) + if err != nil { + return err + } + + // Determine absolute path for database. + dbPath, err := filepath.Abs(fs.Arg(0)) + if err != nil { + return err + } + + // Instantiate DB. + dbConfig := config.DBConfig(dbPath) + if dbConfig == nil { + return fmt.Errorf("database not found in config: %s", dbPath) + } + db, err := newDBFromConfig(dbConfig) + if err != nil { + return err + } + + // Find snapshots by db or replica. + var infos []*litestream.WALInfo + if *replicaName != "" { + if r := db.Replica(*replicaName); r == nil { + return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath) + } else if infos, err = r.WALs(ctx); err != nil { + return err + } + } else { + if infos, err = db.WALs(ctx); err != nil { + return err + } + } + + // List all WAL files. + w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) + fmt.Fprintln(w, "replica\tgeneration\tindex\toffset\tsize\tcreated") + for _, info := range infos { + if *generation != "" && info.Generation != *generation { + continue + } + + fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%d\t%s\n", + info.Replica, + info.Generation, + info.Index, + info.Offset, + info.Size, + info.CreatedAt.Format(time.RFC3339), + ) + } + w.Flush() + + return nil +} + +func (c *WALCommand) Usage() { + fmt.Printf(` +The wal command lists all wal files available for a database. + +Usage: + + litestream wal [arguments] DB + +Arguments: + + -config PATH + Specifies the configuration file. + Defaults to %s + + -replica NAME + Optional, filter by a specific replica. + + -generation NAME + Optional, filter by a specific generation. + + +Examples: + + # List all WAL files for a database. + $ litestream wal /path/to/db + + # List all WAL files on S3 for a specific generation. + $ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db + +`[1:], + DefaultConfigPath, + ) +} diff --git a/db.go b/db.go index 77126b4..670eab2 100644 --- a/db.go +++ b/db.go @@ -277,6 +277,24 @@ func (db *DB) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) { return infos, nil } +// WALs returns a list of all WAL files across all replicas. +func (db *DB) WALs(ctx context.Context) ([]*WALInfo, error) { + var infos []*WALInfo + for _, r := range db.Replicas { + a, err := r.WALs(ctx) + if err != nil { + return nil, err + } + infos = append(infos, a...) + } + + // Sort in order by time. + sort.Slice(infos, func(i, j int) bool { + return infos[i].CreatedAt.Before(infos[j].CreatedAt) + }) + return infos, nil +} + // Init initializes the connection to the database. // Skipped if already initialized or if the database file does not exist. func (db *DB) Init() (err error) { diff --git a/litestream.go b/litestream.go index 074e955..577ff5d 100644 --- a/litestream.go +++ b/litestream.go @@ -47,6 +47,18 @@ type SnapshotInfo struct { Replica string Generation string Index int + Size int64 + CreatedAt time.Time +} + +// WALInfo represents file information about a WAL file. +type WALInfo struct { + Name string + Replica string + Generation string + Index int + Offset int64 + Size int64 CreatedAt time.Time } diff --git a/replicator.go b/replicator.go index 8bef437..96d7c77 100644 --- a/replicator.go +++ b/replicator.go @@ -42,6 +42,9 @@ type Replica interface { // Returns a list of available snapshots in the replica. Snapshots(ctx context.Context) ([]*SnapshotInfo, error) + // Returns a list of available WAL files in the replica. + WALs(ctx context.Context) ([]*WALInfo, error) + // Returns the highest index for a snapshot within a generation that occurs // before timestamp. If timestamp is zero, returns the latest snapshot. SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) @@ -333,6 +336,7 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) { Replica: r.Name(), Generation: generation, Index: index, + Size: fi.Size(), CreatedAt: fi.ModTime().UTC(), }) } @@ -341,6 +345,61 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) { return infos, nil } +// WALs returns a list of available WAL files in the replica. +func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) { + generations, err := r.Generations(ctx) + if err != nil { + return nil, err + } + + var infos []*WALInfo + for _, generation := range generations { + // Find a list of all directory groups. + dir := r.WALDir(generation) + subfis, err := ioutil.ReadDir(dir) + if os.IsNotExist(err) { + continue + } else if err != nil { + return nil, err + } + + // Iterate over WAL group subdirectories. + for _, subfi := range subfis { + if !subfi.IsDir() { + continue + } + + // Find a list of all WAL files in the group. + fis, err := ioutil.ReadDir(filepath.Join(dir, subfi.Name())) + if os.IsNotExist(err) { + continue + } else if err != nil { + return nil, err + } + + // Iterate over each WAL file. + for _, fi := range fis { + index, offset, _, err := ParseWALPath(fi.Name()) + if err != nil { + continue + } + + infos = append(infos, &WALInfo{ + Name: fi.Name(), + Replica: r.Name(), + Generation: generation, + Index: index, + Offset: offset, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + }) + } + } + } + + return infos, nil +} + // Start starts replication for a given generation. func (r *FileReplica) Start(ctx context.Context) { // Stop previous replication.