Add 'snapshots' command.

This commit is contained in:
Ben Johnson
2020-12-30 15:31:12 -07:00
parent 5cc78fafa0
commit ffc25e2654
5 changed files with 207 additions and 4 deletions

View File

@@ -54,6 +54,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
return (&ReplicateCommand{}).Run(ctx, args) return (&ReplicateCommand{}).Run(ctx, args)
case "restore": case "restore":
return (&RestoreCommand{}).Run(ctx, args) return (&RestoreCommand{}).Run(ctx, args)
case "snapshots":
return (&SnapshotsCommand{}).Run(ctx, args)
case "version": case "version":
return (&VersionCommand{}).Run(ctx, args) return (&VersionCommand{}).Run(ctx, args)
default: default:
@@ -75,9 +77,10 @@ Usage:
The commands are: The commands are:
generations list available generations across all dbs & replicas generations list available generations for a database
replicate runs a server to replicate databases replicate runs a server to replicate databases
restore recovers database backup from a replica restore recovers database backup from a replica
snapshots list available snapshots for a database
version prints the version version prints the version
`[1:]) `[1:])
} }

121
cmd/litestream/snapshots.go Normal file
View File

@@ -0,0 +1,121 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/benbjohnson/litestream"
)
type SnapshotsCommand struct{}
func NewSnapshotsCommand() *SnapshotsCommand {
return &SnapshotsCommand{}
}
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
replicaName := fs.String("replica", "", "replica name")
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 || fs.Arg(0) == "" {
return fmt.Errorf("database path required")
} else if fs.NArg() > 1 {
return fmt.Errorf("too many arguments")
}
// Load configuration.
if configPath == "" {
return errors.New("-config required")
}
config, err := ReadConfigFile(configPath)
if err != nil {
return err
}
// Determine absolute path for database.
dbPath, err := filepath.Abs(fs.Arg(0))
if err != nil {
return err
}
// Instantiate DB.
dbConfig := config.DBConfig(dbPath)
if dbConfig == nil {
return fmt.Errorf("database not found in config: %s", dbPath)
}
db, err := newDBFromConfig(dbConfig)
if err != nil {
return err
}
// Find snapshots by db or replica.
var infos []*litestream.SnapshotInfo
if *replicaName != "" {
if r := db.Replica(*replicaName); r == nil {
return fmt.Errorf("replica %q not found for database %q", *replicaName, dbPath)
} else if infos, err = r.Snapshots(ctx); err != nil {
return err
}
} else {
if infos, err = db.Snapshots(ctx); err != nil {
return err
}
}
// List all snapshots.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
fmt.Fprintln(w, "replica\tname\tgeneration\tindex\tcreated")
for _, info := range infos {
fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\n",
info.Replica,
info.Name,
info.Generation,
info.Index,
info.CreatedAt.Format(time.RFC3339),
)
}
w.Flush()
return nil
}
func (c *SnapshotsCommand) Usage() {
fmt.Printf(`
The snapshots command lists all snapshots available for a database.
Usage:
litestream snapshots [arguments] DB
Arguments:
-config PATH
Specifies the configuration file.
Defaults to %s
-replica NAME
Optional, filter by a specific replica.
Examples:
# List all snapshots for a database.
$ litestream snapshots /path/to/db
# List all snapshots on S3.
$ litestream snapshots -replica s3 /path/to/db
`[1:],
DefaultConfigPath,
)
}

29
db.go
View File

@@ -14,6 +14,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -152,6 +153,16 @@ func (db *DB) CurrentShadowWALIndex(generation string) (int, error) {
return index, nil return index, nil
} }
// Replica returns a replica by name.
func (db *DB) Replica(name string) Replica {
for _, r := range db.Replicas {
if r.Name() == name {
return r
}
}
return nil
}
// Pos returns the current position of the database. // Pos returns the current position of the database.
func (db *DB) Pos() (Pos, error) { func (db *DB) Pos() (Pos, error) {
generation, err := db.CurrentGeneration() generation, err := db.CurrentGeneration()
@@ -248,6 +259,24 @@ func (db *DB) UpdatedAt() (time.Time, error) {
return t, nil return t, nil
} }
// Snapshots returns a list of all snapshots across all replicas.
func (db *DB) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
var infos []*SnapshotInfo
for _, r := range db.Replicas {
a, err := r.Snapshots(ctx)
if err != nil {
return nil, err
}
infos = append(infos, a...)
}
// Sort in order by time.
sort.Slice(infos, func(i, j int) bool {
return infos[i].CreatedAt.Before(infos[j].CreatedAt)
})
return infos, nil
}
// Init initializes the connection to the database. // Init initializes the connection to the database.
// Skipped if already initialized or if the database file does not exist. // Skipped if already initialized or if the database file does not exist.
func (db *DB) Init() (err error) { func (db *DB) Init() (err error) {

View File

@@ -13,6 +13,7 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
@@ -40,6 +41,15 @@ var (
ErrNoSnapshots = errors.New("no snapshots available") ErrNoSnapshots = errors.New("no snapshots available")
) )
// SnapshotInfo represents file information about a snapshot.
type SnapshotInfo struct {
Name string
Replica string
Generation string
Index int
CreatedAt time.Time
}
// Pos is a position in the WAL for a generation. // Pos is a position in the WAL for a generation.
type Pos struct { type Pos struct {
Generation string // generation name Generation string // generation name

View File

@@ -29,6 +29,9 @@ type Replica interface {
// Stops all replication processing. Blocks until processing stopped. // Stops all replication processing. Blocks until processing stopped.
Stop() Stop()
// Returns the last replication position.
Pos() Pos
// Returns a list of generation names for the replica. // Returns a list of generation names for the replica.
Generations(ctx context.Context) ([]string, error) Generations(ctx context.Context) ([]string, error)
@@ -36,8 +39,8 @@ type Replica interface {
// snapshot & WAL files as well as the time range covered. // snapshot & WAL files as well as the time range covered.
GenerationStats(ctx context.Context, generation string) (GenerationStats, error) GenerationStats(ctx context.Context, generation string) (GenerationStats, error)
// Returns the last replication position. // Returns a list of available snapshots in the replica.
Pos() Pos Snapshots(ctx context.Context) ([]*SnapshotInfo, error)
// Returns the highest index for a snapshot within a generation that occurs // Returns the highest index for a snapshot within a generation that occurs
// before timestamp. If timestamp is zero, returns the latest snapshot. // before timestamp. If timestamp is zero, returns the latest snapshot.
@@ -301,6 +304,43 @@ func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, er
return n, min, max, nil return n, min, max, nil
} }
// Snapshots returns a list of available snapshots in the replica.
func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
generations, err := r.Generations(ctx)
if err != nil {
return nil, err
}
var infos []*SnapshotInfo
for _, generation := range generations {
fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if os.IsNotExist(err) {
continue
} else if err != nil {
return nil, err
}
for _, fi := range fis {
index, _, _, err := ParseSnapshotPath(fi.Name())
if err != nil {
continue
}
// TODO: Add schedule name to snapshot info.
infos = append(infos, &SnapshotInfo{
Name: fi.Name(),
Replica: r.Name(),
Generation: generation,
Index: index,
CreatedAt: fi.ModTime().UTC(),
})
}
}
return infos, nil
}
// Start starts replication for a given generation. // Start starts replication for a given generation.
func (r *FileReplica) Start(ctx context.Context) { func (r *FileReplica) Start(ctx context.Context) {
// Stop previous replication. // Stop previous replication.
@@ -465,7 +505,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
if r.Pos().IsZero() { if r.Pos().IsZero() {
pos, err := r.calcPos(generation) pos, err := r.calcPos(generation)
if err != nil { if err != nil {
return fmt.Errorf("cannot determine replica position: %s", r.db.Path(), r.Name(), err) return fmt.Errorf("cannot determine replica position: %s", err)
} }
r.mu.Lock() r.mu.Lock()