Add 'wal' command

This commit is contained in:
Ben Johnson
2020-12-30 16:03:23 -07:00
parent 8a7d8175fc
commit 11d7d22383
6 changed files with 225 additions and 3 deletions

View File

@@ -58,6 +58,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
return (&SnapshotsCommand{}).Run(ctx, args)
case "version":
return (&VersionCommand{}).Run(ctx, args)
case "wal":
return (&WALCommand{}).Run(ctx, args)
default:
if cmd == "" || cmd == "help" || strings.HasPrefix(cmd, "-") {
m.Usage()
@@ -82,6 +84,7 @@ The commands are:
restore recovers database backup from a replica
snapshots list available snapshots for a database
version prints the version
wal list available WAL files for a database
`[1:])
}

View File

@@ -74,13 +74,13 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
// List all snapshots.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "replica\tname\tgeneration\tindex\tcreated")
fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated")
for _, info := range infos {
fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\n",
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n",
info.Replica,
info.Name,
info.Generation,
info.Index,
info.Size,
info.CreatedAt.Format(time.RFC3339),
)
}

130
cmd/litestream/wal.go Normal file
View File

@@ -0,0 +1,130 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
type WALCommand struct{}
func NewWALCommand() *WALCommand {
return &WALCommand{}
}
func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
replicaName := fs.String("replica", "", "replica name")
generation := fs.String("generation", "", "generation name")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
var infos []*litestream.WALInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.WALs(ctx); err != nil {
return err
}
} else {
if infos, err = db.WALs(ctx); err != nil {
return err
}
}
// List all WAL files.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "replica\tgeneration\tindex\toffset\tsize\tcreated")
for _, info := range infos {
if *generation != "" && info.Generation != *generation {
continue
}
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%d\t%s\n",
info.Replica,
info.Generation,
info.Index,
info.Offset,
info.Size,
info.CreatedAt.Format(time.RFC3339),
)
}
w.Flush()
return nil
}
func (c *WALCommand) Usage() {
fmt.Printf(`
The wal command lists all wal files available for a database.
Usage:
litestream wal [arguments] DB
Arguments:
-config PATH
Specifies the configuration file.
Defaults to %s
-replica NAME
Optional, filter by a specific replica.
-generation NAME
Optional, filter by a specific generation.
Examples:
# List all WAL files for a database.
$ litestream wal /path/to/db
# List all WAL files on S3 for a specific generation.
$ litestream snapshots -replica s3 -generation xxxxxxxx /path/to/db
`[1:],
DefaultConfigPath,
)
}

18
db.go
View File

@@ -277,6 +277,24 @@ func (db *DB) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
return infos, nil
}
// WALs returns a list of all WAL files across all replicas.
func (db *DB) WALs(ctx context.Context) ([]*WALInfo, error) {
var infos []*WALInfo
for _, r := range db.Replicas {
a, err := r.WALs(ctx)
if err != nil {
return nil, err
}
infos = append(infos, a...)
}
// Sort in order by time.
sort.Slice(infos, func(i, j int) bool {
return infos[i].CreatedAt.Before(infos[j].CreatedAt)
})
return infos, nil
}
// Init initializes the connection to the database.
// Skipped if already initialized or if the database file does not exist.
func (db *DB) Init() (err error) {

View File

@@ -47,6 +47,18 @@ type SnapshotInfo struct {
Replica string
Generation string
Index int
Size int64
CreatedAt time.Time
}
// WALInfo represents file information about a WAL file.
type WALInfo struct {
Name string
Replica string
Generation string
Index int
Offset int64
Size int64
CreatedAt time.Time
}

View File

@@ -42,6 +42,9 @@ type Replica interface {
// Returns a list of available snapshots in the replica.
Snapshots(ctx context.Context) ([]*SnapshotInfo, error)
// Returns a list of available WAL files in the replica.
WALs(ctx context.Context) ([]*WALInfo, error)
// Returns the highest index for a snapshot within a generation that occurs
// before timestamp. If timestamp is zero, returns the latest snapshot.
SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
@@ -333,6 +336,7 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
Replica: r.Name(),
Generation: generation,
Index: index,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
}
@@ -341,6 +345,61 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
return infos, nil
}
// WALs returns a list of available WAL files in the replica.
func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
generations, err := r.Generations(ctx)
if err != nil {
return nil, err
}
var infos []*WALInfo
for _, generation := range generations {
// Find a list of all directory groups.
dir := r.WALDir(generation)
subfis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) {
continue
} else if err != nil {
return nil, err
}
// Iterate over WAL group subdirectories.
for _, subfi := range subfis {
if !subfi.IsDir() {
continue
}
// Find a list of all WAL files in the group.
fis, err := ioutil.ReadDir(filepath.Join(dir, subfi.Name()))
if os.IsNotExist(err) {
continue
} else if err != nil {
return nil, err
}
// Iterate over each WAL file.
for _, fi := range fis {
index, offset, _, err := ParseWALPath(fi.Name())
if err != nil {
continue
}
infos = append(infos, &WALInfo{
Name: fi.Name(),
Replica: r.Name(),
Generation: generation,
Index: index,
Offset: offset,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
}
}
}
return infos, nil
}
// Start starts replication for a given generation.
func (r *FileReplica) Start(ctx context.Context) {
// Stop previous replication.