Add 'restore' command.

This commit is contained in:
Ben Johnson
2020-12-28 15:58:08 -07:00
parent 44973dbbbc
commit 81e99c8035
6 changed files with 369 additions and 85 deletions

View File

@@ -72,7 +72,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
// Iterate over each replicator in the database. // Iterate over each replicator in the database.
for _, r := range db.Replicators { for _, r := range db.Replicators {
generations, err := r.Generations() generations, err := r.Generations(ctx)
if err != nil { if err != nil {
log.Printf("%s: cannot list generations", r.Name(), err) log.Printf("%s: cannot list generations", r.Name(), err)
continue continue
@@ -80,7 +80,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
// Iterate over each generation for the replicator. // Iterate over each generation for the replicator.
for _, generation := range generations { for _, generation := range generations {
stats, err := r.GenerationStats(generation) stats, err := r.GenerationStats(ctx, generation)
if err != nil { if err != nil {
log.Printf("%s: cannot find generation stats: %s", r.Name(), err) log.Printf("%s: cannot find generation stats: %s", r.Name(), err)
continue continue

View File

@@ -92,6 +92,15 @@ func DefaultConfig() Config {
return Config{} return Config{}
} }
func (c *Config) DBConfig(path string) *DBConfig {
for _, dbConfig := range c.DBs {
if dbConfig.Path == path {
return dbConfig
}
}
return nil
}
// ReadConfigFile unmarshals config from filename. Expands path if needed. // ReadConfigFile unmarshals config from filename. Expands path if needed.
func ReadConfigFile(filename string) (Config, error) { func ReadConfigFile(filename string) (Config, error) {
config := DefaultConfig() config := DefaultConfig()

View File

@@ -8,8 +8,9 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"text/tabwriter"
"time" "time"
"github.com/benbjohnson/litestream"
) )
type RestoreCommand struct { type RestoreCommand struct {
@@ -26,9 +27,9 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
registerConfigFlag(fs, &configPath) registerConfigFlag(fs, &configPath)
fs.StringVar(&opt.OutputPath, "o", "", "output path") fs.StringVar(&opt.OutputPath, "o", "", "output path")
fs.StringVar(&opt.Replica, "replica", "", "replica name") fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.StringVar(&opt.Generation, "generation", "", "generation name") fs.StringVar(&opt.Generation, "generation", "", "generation name")
fs.StringVar(&opt.DryRun, "dry-run", "", "dry run") fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run")
timestampStr := fs.String("timestamp", "", "timestamp") timestampStr := fs.String("timestamp", "", "timestamp")
verbose := fs.Bool("v", false, "verbose output") verbose := fs.Bool("v", false, "verbose output")
fs.Usage = c.Usage fs.Usage = c.Usage
@@ -51,7 +52,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
// Parse timestamp, if specified. // Parse timestamp, if specified.
if *timestampStr != "" { if *timestampStr != "" {
if opts.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil { if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil {
return errors.New("invalid -timestamp, must specify in ISO 8601 format (e.g. 2000-01-01T00:00:00Z)") return errors.New("invalid -timestamp, must specify in ISO 8601 format (e.g. 2000-01-01T00:00:00Z)")
} }
} }
@@ -81,7 +82,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
return err return err
} }
return db.Restore(opt) return db.Restore(ctx, opt)
} }
func (c *RestoreCommand) Usage() { func (c *RestoreCommand) Usage() {
@@ -95,7 +96,8 @@ Usage:
Arguments: Arguments:
-config PATH -config PATH
Specifies the configuration file. Defaults to %s Specifies the configuration file.
Defaults to %s
-replica NAME -replica NAME
Restore from a specific replica. Restore from a specific replica.
@@ -137,6 +139,7 @@ Examples:
# Restore database from specific generation on S3. # Restore database from specific generation on S3.
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db $ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
`[1:], `[1:],
DefaultConfigPath, DefaultConfigPath,
) )

170
db.go
View File

@@ -777,13 +777,13 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
return newSize, nil return newSize, nil
} }
// WALReader opens a reader for a shadow WAL file at a given position. // ShadowWALReader opens a reader for a shadow WAL file at a given position.
// If the reader is at the end of the file, it attempts to return the next file. // If the reader is at the end of the file, it attempts to return the next file.
// //
// The caller should check Pos() & Size() on the returned reader to check offset. // The caller should check Pos() & Size() on the returned reader to check offset.
func (db *DB) WALReader(pos Pos) (r *WALReader, err error) { func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
// Fetch reader for the requested position. Return if it has data. // Fetch reader for the requested position. Return if it has data.
r, err = db.walReader(pos) r, err = db.shadowWALReader(pos)
if err != nil { if err != nil {
return nil, err return nil, err
} else if r.N() > 0 { } else if r.N() > 0 {
@@ -793,15 +793,15 @@ func (db *DB) WALReader(pos Pos) (r *WALReader, err error) {
// Otherwise attempt to read the start of the next WAL file. // Otherwise attempt to read the start of the next WAL file.
pos.Index, pos.Offset = pos.Index+1, 0 pos.Index, pos.Offset = pos.Index+1, 0
r, err = db.walReader(pos) r, err = db.shadowWALReader(pos)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, io.EOF return nil, io.EOF
} }
return r, err return r, err
} }
// walReader opens a file reader for a shadow WAL file at a given position. // shadowWALReader opens a file reader for a shadow WAL file at a given position.
func (db *DB) walReader(pos Pos) (r *WALReader, err error) { func (db *DB) shadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
filename := db.ShadowWALPath(pos.Generation, pos.Index) filename := db.ShadowWALPath(pos.Generation, pos.Index)
f, err := os.Open(filename) f, err := os.Open(filename)
@@ -832,7 +832,7 @@ func (db *DB) walReader(pos Pos) (r *WALReader, err error) {
return nil, err return nil, err
} }
return &WALReader{ return &ShadowWALReader{
f: f, f: f,
n: fileSize - pos.Offset, n: fileSize - pos.Offset,
pos: pos, pos: pos,
@@ -854,25 +854,25 @@ func frameAlign(offset int64, pageSize int) int64 {
return (frameN * frameSize) + WALHeaderSize return (frameN * frameSize) + WALHeaderSize
} }
// WALReader represents a reader for a WAL file that tracks WAL position. // ShadowWALReader represents a reader for a shadow WAL file that tracks WAL position.
type WALReader struct { type ShadowWALReader struct {
f *os.File f *os.File
n int64 n int64
pos Pos pos Pos
} }
// Close closes the underlying WAL file handle. // Close closes the underlying WAL file handle.
func (r *WALReader) Close() error { return r.f.Close() } func (r *ShadowWALReader) Close() error { return r.f.Close() }
// N returns the remaining bytes in the reader. // N returns the remaining bytes in the reader.
func (r *WALReader) N() int64 { return r.n } func (r *ShadowWALReader) N() int64 { return r.n }
// Pos returns the current WAL position. // Pos returns the current WAL position.
func (r *WALReader) Pos() Pos { return r.pos } func (r *ShadowWALReader) Pos() Pos { return r.pos }
// Read reads bytes into p, updates the position, and returns the bytes read. // Read reads bytes into p, updates the position, and returns the bytes read.
// Returns io.EOF at the end of the available section of the WAL. // Returns io.EOF at the end of the available section of the WAL.
func (r *WALReader) Read(p []byte) (n int, err error) { func (r *ShadowWALReader) Read(p []byte) (n int, err error) {
if r.n <= 0 { if r.n <= 0 {
return 0, io.EOF return 0, io.EOF
} }
@@ -1009,7 +1009,7 @@ func (db *DB) monitor() {
// replica or generation or it will automatically choose the best one. Finally, // replica or generation or it will automatically choose the best one. Finally,
// a timestamp can be specified to restore the database to a specific // a timestamp can be specified to restore the database to a specific
// point-in-time. // point-in-time.
func (db *DB) Restore(opt RestoreOptions) { func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
// Ensure logger exists. // Ensure logger exists.
logger := opt.Logger logger := opt.Logger
if logger == nil { if logger == nil {
@@ -1019,7 +1019,7 @@ func (db *DB) Restore(opt RestoreOptions) {
// Determine the correct output path. // Determine the correct output path.
outputPath := opt.OutputPath outputPath := opt.OutputPath
if outputPath == "" { if outputPath == "" {
outputPath = db.Path outputPath = db.Path()
} }
// Ensure output path does not already exist (unless this is a dry run). // Ensure output path does not already exist (unless this is a dry run).
@@ -1027,65 +1027,56 @@ func (db *DB) Restore(opt RestoreOptions) {
if _, err := os.Stat(outputPath); err == nil { if _, err := os.Stat(outputPath); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", outputPath) return fmt.Errorf("cannot restore, output path already exists: %s", outputPath)
} else if err != nil && !os.IsNotExist(err) { } else if err != nil && !os.IsNotExist(err) {
return outputPath return err
} }
} }
// Determine target replica & generation to restore from. // Determine target replica & generation to restore from.
r, generation, err := db.restoreTarget(opt, logger) r, generation, err := db.restoreTarget(ctx, opt, logger)
if err != nil { if err != nil {
return err return err
} }
// Determine manifest to restore from. // Find lastest snapshot that occurs before timestamp.
snapshotPath, walPaths, err := opt.determineRestoreManifest(r, generation, opt.Timestamp, logger) minWALIndex, err := r.SnapshotIndexAt(ctx, generation, opt.Timestamp)
if err != nil { if err != nil {
return err return fmt.Errorf("cannot find snapshot index for restore: %w", err)
} }
// Find the maximum WAL index that occurs before timestamp.
maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Timestamp)
if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err)
}
// Initialize starting position.
pos := Pos{Generation: generation, Index: minWALIndex}
tmpPath := outputPath + ".tmp"
// Copy snapshot to output path. // Copy snapshot to output path.
logger.Printf("restoring snapshot from %s://%s/%s to %s.tmp", r.Name(), generation, snapshotPath, outputPath) logger.Printf("restoring snapshot from %s://%s/%s to %s", r.Name(), generation, minWALIndex, tmpPath)
if !opt.DryRun { if !opt.DryRun {
if f, err := os.Create(outputPath + ".tmp"); err != nil { if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
return err return fmt.Errorf("cannot restore snapshot: %w", err)
} else if err := r.RestoreSnapshot(f, snapshotPath); err != nil {
f.Close()
return err
} else if err := f.Sync(); err != nil {
f.Close()
return err
} else if err := f.Close(); err != nil {
return err
} }
} }
// Restore each WAL file. // Restore each WAL file until we reach our maximum index.
for _, walPath := range walPaths { for index := minWALIndex; index <= maxWALIndex; index++ {
logger.Printf("restoring wal from %s://%s/%s to %s.tmp-wal", r.Name(), generation, snapshotPath, outputPath) logger.Printf("restoring wal from %s://%s/%016x to %s-wal", r.Name(), generation, index, tmpPath)
if opt.DryRun { if opt.DryRun {
continue continue
} }
// Copy WAL from replica. if err := db.restoreWAL(ctx, r, generation, index, tmpPath); err != nil {
if f, err := os.Create(outputPath + ".tmp-wal"); err != nil { return fmt.Errorf("cannot restore wal: %w", err)
return err
} else if err := r.RestoreWAL(f, walPath); err != nil {
f.Close()
return err
} else if err := f.Sync(); err != nil {
f.Close()
return err
} else if err := f.Close(); err != nil {
return err
} }
// TODO: Open database with SQLite and force a truncated checkpoint.
} }
// Copy file to final location. // Copy file to final location.
logger.Printf("renaming database from temporary location") logger.Printf("renaming database from temporary location")
if !opt.DryRun { if !opt.DryRun {
if err := os.Rename(outputPath+".tmp", outputPath); err != nil { if err := os.Rename(tmpPath, outputPath); err != nil {
return err return err
} }
} }
@@ -1093,7 +1084,7 @@ func (db *DB) Restore(opt RestoreOptions) {
return nil return nil
} }
func (db *DB) restoreTarget(opt RestoreOptions, logger *log.Logger) (Replicator, string, error) { func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replicator, string, error) {
var target struct { var target struct {
replicator Replicator replicator Replicator
generation string generation string
@@ -1106,15 +1097,20 @@ func (db *DB) restoreTarget(opt RestoreOptions, logger *log.Logger) (Replicator,
continue continue
} }
generations, err := r.Generations(ctx)
if err != nil {
return nil, "", fmt.Errorf("cannot fetch generations: %w", err)
}
// Search generations for one that contains the requested timestamp. // Search generations for one that contains the requested timestamp.
for _, generation := range r.Generations() { for _, generation := range generations {
// Skip generation if it does not match filter. // Skip generation if it does not match filter.
if opt.Generation != "" && generation != opt.Generation { if opt.Generation != "" && generation != opt.Generation {
continue continue
} }
// Fetch stats for generation. // Fetch stats for generation.
stats, err := r.GenerationStats(generation) stats, err := r.GenerationStats(ctx, generation)
if err != nil { if err != nil {
return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err) return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err)
} }
@@ -1145,6 +1141,72 @@ func (db *DB) restoreTarget(opt RestoreOptions, logger *log.Logger) (Replicator,
return target.replicator, target.generation, nil return target.replicator, target.generation, nil
} }
// restoreSnapshot copies a snapshot from the replica to a file.
func (db *DB) restoreSnapshot(ctx context.Context, r Replicator, generation string, index int, filename string) error {
if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil {
return err
}
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
rd, err := r.SnapshotReader(ctx, generation, index)
if err != nil {
return err
}
defer rd.Close()
if _, err := io.Copy(f, rd); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
return f.Close()
}
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
func (db *DB) restoreWAL(ctx context.Context, r Replicator, generation string, index int, dbPath string) error {
// Open handle to destination WAL path.
f, err := os.Create(dbPath + "-wal")
if err != nil {
return err
}
defer f.Close()
rd, err := r.WALReader(ctx, generation, index)
if err != nil {
return err
}
defer rd.Close()
//
if _, err := io.Copy(f, rd); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
// Open SQLite database and force a truncating checkpoint.
d, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}
defer d.Close()
if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
return err
} else if err := d.Close(); err != nil {
return err
}
return nil
}
// RestoreOptions represents options for DB.Restore(). // RestoreOptions represents options for DB.Restore().
type RestoreOptions struct { type RestoreOptions struct {
// Target path to restore into. // Target path to restore into.
@@ -1163,8 +1225,12 @@ type RestoreOptions struct {
// If zero, database restore to most recent state available. // If zero, database restore to most recent state available.
Timestamp time.Time Timestamp time.Time
// If true, no actual restore is performed.
// Only equivalent log output for a regular restore.
DryRun bool
// Logger used to print status to. // Logger used to print status to.
Logger log.Logger Logger *log.Logger
} }
func headerByteOrder(hdr []byte) (binary.ByteOrder, error) { func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {

View File

@@ -1,13 +1,16 @@
package litestream package litestream
import ( import (
"compress/gzip"
"database/sql" "database/sql"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"regexp"
"strconv" "strconv"
"strings" "strings"
@@ -24,6 +27,11 @@ const (
GenerationNameLen = 16 GenerationNameLen = 16
) )
// Litestream errors.
var (
ErrNoSnapshots = errors.New("no snapshots available")
)
// Pos is a position in the WAL for a generation. // Pos is a position in the WAL for a generation.
type Pos struct { type Pos struct {
Generation string // generation name Generation string // generation name
@@ -153,19 +161,66 @@ func IsGenerationName(s string) bool {
// IsSnapshotPath returns true if s is a path to a snapshot file. // IsSnapshotPath returns true if s is a path to a snapshot file.
func IsSnapshotPath(s string) bool { func IsSnapshotPath(s string) bool {
return strings.HasSuffix(s, SnapshotExt) || strings.HasSuffix(s, SnapshotExt+".gz") return snapshotPathRegex.MatchString(s)
} }
// ParseSnapshotPath returns the index for the snapshot.
// Returns an error if the path is not a valid snapshot path.
func ParseSnapshotPath(s string) (index int, typ, ext string, err error) {
a := snapshotPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), a[2], a[3], nil
}
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:-(\w+))?(.snapshot(?:.gz)?)$`)
// IsWALPath returns true if s is a path to a WAL file. // IsWALPath returns true if s is a path to a WAL file.
func IsWALPath(s string) bool { func IsWALPath(s string) bool {
return strings.HasSuffix(s, WALExt) || strings.HasSuffix(s, WALExt+".gz") return walPathRegex.MatchString(s)
} }
// ParseWALPath returns the index & offset for the WAL file.
// Returns an error if the path is not a valid snapshot path.
func ParseWALPath(s string) (index int, offset int64, ext string, err error) {
a := walPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, 0, "", fmt.Errorf("invalid wal path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
off64, _ := strconv.ParseUint(a[2], 16, 64)
return int(i64), int64(off64), a[3], nil
}
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`)
// isHexChar returns true if ch is a lowercase hex character. // isHexChar returns true if ch is a lowercase hex character.
func isHexChar(ch rune) bool { func isHexChar(ch rune) bool {
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
} }
// gzipReadCloser wraps gzip.Reader to also close the underlying reader on close.
type gzipReadCloser struct {
r *gzip.Reader
closer io.ReadCloser
}
func (r *gzipReadCloser) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
func (r *gzipReadCloser) Close() error {
if err := r.r.Close(); err != nil {
r.closer.Close()
return err
}
return r.closer.Close()
}
// HexDump returns hexdump output but with duplicate lines removed. // HexDump returns hexdump output but with duplicate lines removed.
func HexDump(b []byte) string { func HexDump(b []byte) string {
const prefixN = len("00000000") const prefixN = len("00000000")

View File

@@ -24,18 +24,32 @@ type Replicator interface {
// String identifier for the type of replicator ("file", "s3", etc). // String identifier for the type of replicator ("file", "s3", etc).
Type() string Type() string
// Returns a list of generation names for the replicator.
Generations() ([]string, error)
// Returns basic information about a generation including the number of
// snapshot & WAL files as well as the time range covered.
GenerationStats(generation string) (GenerationStats, error)
// Starts replicating in a background goroutine. // Starts replicating in a background goroutine.
Start(ctx context.Context) Start(ctx context.Context)
// Stops all replication processing. Blocks until processing stopped. // Stops all replication processing. Blocks until processing stopped.
Stop() Stop()
// Returns a list of generation names for the replicator.
Generations(ctx context.Context) ([]string, error)
// Returns basic information about a generation including the number of
// snapshot & WAL files as well as the time range covered.
GenerationStats(ctx context.Context, generation string) (GenerationStats, error)
// Returns the highest index for a snapshot within a generation that occurs
// before timestamp. If timestamp is zero, returns the latest snapshot.
SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
// Returns the highest index for a WAL file that occurs before timestamp.
// If timestamp is zero, returns the highest WAL index.
WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
// Returns a reader for snapshot data at the given generation/index.
SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
// Returns a reader for WAL data at the given position.
WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
} }
var _ Replicator = (*FileReplicator)(nil) var _ Replicator = (*FileReplicator)(nil)
@@ -76,18 +90,28 @@ func (r *FileReplicator) Type() string {
return "file" return "file"
} }
// SnapshotDir returns the path to a generation's snapshot directory.
func (r *FileReplicator) SnapshotDir(generation string) string {
return filepath.Join(r.dst, "generations", generation, "snapshots")
}
// SnapshotPath returns the path to a snapshot file. // SnapshotPath returns the path to a snapshot file.
func (r *FileReplicator) SnapshotPath(generation string, index int) string { func (r *FileReplicator) SnapshotPath(generation string, index int) string {
return filepath.Join(r.dst, "generations", generation, "snapshots", fmt.Sprintf("%016x.snapshot.gz", index)) return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index))
}
// WALDir returns the path to a generation's WAL directory
func (r *FileReplicator) WALDir(generation string) string {
return filepath.Join(r.dst, "generations", generation, "wal")
} }
// WALPath returns the path to a WAL file. // WALPath returns the path to a WAL file.
func (r *FileReplicator) WALPath(generation string, index int) string { func (r *FileReplicator) WALPath(generation string, index int) string {
return filepath.Join(r.dst, "generations", generation, "wal", fmt.Sprintf("%016x.wal", index)) return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index))
} }
// Generations returns a list of available generation names. // Generations returns a list of available generation names.
func (r *FileReplicator) Generations() ([]string, error) { func (r *FileReplicator) Generations(ctx context.Context) ([]string, error) {
fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations")) fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations"))
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
@@ -108,7 +132,7 @@ func (r *FileReplicator) Generations() ([]string, error) {
} }
// GenerationStats returns stats for a generation. // GenerationStats returns stats for a generation.
func (r *FileReplicator) GenerationStats(generation string) (stats GenerationStats, err error) { func (r *FileReplicator) GenerationStats(ctx context.Context, generation string) (stats GenerationStats, err error) {
// Determine stats for all snapshots. // Determine stats for all snapshots.
n, min, max, err := r.snapshotStats(generation) n, min, max, err := r.snapshotStats(generation)
if err != nil { if err != nil {
@@ -136,7 +160,7 @@ func (r *FileReplicator) GenerationStats(generation string) (stats GenerationSta
} }
func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time.Time, err error) { func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time.Time, err error) {
fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "snapshots")) fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if os.IsNotExist(err) { if os.IsNotExist(err) {
return n, min, max, nil return n, min, max, nil
} else if err != nil { } else if err != nil {
@@ -161,7 +185,7 @@ func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time.
} }
func (r *FileReplicator) walStats(generation string) (n int, min, max time.Time, err error) { func (r *FileReplicator) walStats(generation string) (n int, min, max time.Time, err error) {
fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "wal")) fis, err := ioutil.ReadDir(r.WALDir(generation))
if os.IsNotExist(err) { if os.IsNotExist(err) {
return n, min, max, nil return n, min, max, nil
} else if err != nil { } else if err != nil {
@@ -283,8 +307,8 @@ func (r *FileReplicator) pos() (pos Pos, err error) {
pos.Generation = generation pos.Generation = generation
// Find the max WAL file. // Find the max WAL file.
walDir := filepath.Join(r.dst, "generations", generation, "wal") dir := r.WALDir(generation)
fis, err := ioutil.ReadDir(walDir) fis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at beginning of generation return pos, nil // no replicated wal, start at beginning of generation
} else if err != nil { } else if err != nil {
@@ -312,7 +336,7 @@ func (r *FileReplicator) pos() (pos Pos, err error) {
pos.Index = index pos.Index = index
// Determine current offset. // Determine current offset.
fi, err := os.Stat(filepath.Join(walDir, FormatWALFilename(pos.Index))) fi, err := os.Stat(filepath.Join(dir, FormatWALFilename(pos.Index)))
if err != nil { if err != nil {
return pos, err return pos, err
} }
@@ -348,7 +372,7 @@ func (r *FileReplicator) snapshot(ctx context.Context, generation string, index
// snapshotN returns the number of snapshots for a generation. // snapshotN returns the number of snapshots for a generation.
func (r *FileReplicator) snapshotN(generation string) (int, error) { func (r *FileReplicator) snapshotN(generation string) (int, error) {
fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "snapshots")) fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if os.IsNotExist(err) { if os.IsNotExist(err) {
return 0, nil return 0, nil
} else if err != nil { } else if err != nil {
@@ -357,10 +381,7 @@ func (r *FileReplicator) snapshotN(generation string) (int, error) {
var n int var n int
for _, fi := range fis { for _, fi := range fis {
name := fi.Name() if _, _, _, err := ParseSnapshotPath(fi.Name()); err == nil {
name = strings.TrimSuffix(name, ".gz")
if strings.HasSuffix(name, SnapshotExt) {
n++ n++
} }
} }
@@ -379,7 +400,7 @@ func (r *FileReplicator) sync(ctx context.Context, pos Pos) (_ Pos, err error) {
} }
func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) { func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) {
rd, err := r.db.WALReader(pos) rd, err := r.db.ShadowWALReader(pos)
if err == io.EOF { if err == io.EOF {
return pos, err return pos, err
} else if err != nil { } else if err != nil {
@@ -426,7 +447,7 @@ func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err erro
// compress gzips all WAL files before the current one. // compress gzips all WAL files before the current one.
func (r *FileReplicator) compress(ctx context.Context, generation string) error { func (r *FileReplicator) compress(ctx context.Context, generation string) error {
dir := filepath.Join(r.dst, "generations", generation, "wal") dir := r.WALDir(generation)
filenames, err := filepath.Glob(filepath.Join(dir, "*.wal")) filenames, err := filepath.Glob(filepath.Join(dir, "*.wal"))
if err != nil { if err != nil {
return err return err
@@ -457,6 +478,136 @@ func (r *FileReplicator) compress(ctx context.Context, generation string) error
return nil return nil
} }
// SnapsotIndexAt returns the highest index for a snapshot within a generation
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func (r *FileReplicator) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) {
fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if os.IsNotExist(err) {
return 0, ErrNoSnapshots
} else if err != nil {
return 0, err
}
index := -1
var max time.Time
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, _, err := ParseSnapshotPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
}
// Use snapshot if it newer.
if max.IsZero() || fi.ModTime().After(max) {
index, max = idx, fi.ModTime()
}
}
if index == -1 {
return 0, ErrNoSnapshots
}
return index, nil
}
// Returns the highest index for a WAL file that occurs before timestamp.
// If timestamp is zero, returns the highest WAL index.
func (r *FileReplicator) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) {
fis, err := ioutil.ReadDir(r.WALDir(generation))
if os.IsNotExist(err) {
return 0, nil
} else if err != nil {
return 0, err
}
index := -1
for _, fi := range fis {
// Read index from snapshot filename.
idx, _, _, err := ParseWALPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
} else if idx < index {
continue // earlier index, skip
}
index = idx
}
if index == -1 {
return 0, nil
}
return index, nil
}
// SnapshotReader returns a reader for snapshot data at the given generation/index.
// Returns os.ErrNotExist if no matching index is found.
func (r *FileReplicator) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
dir := r.SnapshotDir(generation)
fis, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
for _, fi := range fis {
// Parse index from snapshot filename. Skip if no match.
idx, _, ext, err := ParseSnapshotPath(fi.Name())
if err != nil || index != idx {
continue
}
// Open & return the file handle if uncompressed.
f, err := os.Open(filepath.Join(dir, fi.Name()))
if err != nil {
return nil, err
} else if ext == ".snapshot" {
return f, nil // not compressed, return as-is.
}
assert(ext == ".snapshot.gz", "invalid snapshot extension")
// If compressed, wrap in a gzip reader and return with wrapper to
// ensure that the underlying file is closed.
r, err := gzip.NewReader(f)
if err != nil {
f.Close()
return nil, err
}
return &gzipReadCloser{r: r, closer: f}, nil
}
return nil, os.ErrNotExist
}
// WALReader returns a reader for WAL data at the given index.
// Returns os.ErrNotExist if no matching index is found.
func (r *FileReplicator) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
filename := r.WALPath(generation, index)
// Attempt to read uncompressed file first.
f, err := os.Open(filename)
if err == nil {
return f, nil // file exist, return
} else if err != nil && !os.IsNotExist(err) {
return nil, err
}
// Otherwise read the compressed file. Return error if file doesn't exist.
f, err = os.Open(filename + ".gz")
if err != nil {
return nil, err
}
// If compressed, wrap in a gzip reader and return with wrapper to
// ensure that the underlying file is closed.
rd, err := gzip.NewReader(f)
if err != nil {
f.Close()
return nil, err
}
return &gzipReadCloser{r: rd, closer: f}, nil
}
// compressFile compresses a file and replaces it with a new file with a .gz extension. // compressFile compresses a file and replaces it with a new file with a .gz extension.
func compressFile(src, dst string) error { func compressFile(src, dst string) error {
r, err := os.Open(src) r, err := os.Open(src)