diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index b83fcbb..c900536 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -72,7 +72,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) // Iterate over each replicator in the database. for _, r := range db.Replicators { - generations, err := r.Generations() + generations, err := r.Generations(ctx) if err != nil { log.Printf("%s: cannot list generations", r.Name(), err) continue @@ -80,7 +80,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) // Iterate over each generation for the replicator. for _, generation := range generations { - stats, err := r.GenerationStats(generation) + stats, err := r.GenerationStats(ctx, generation) if err != nil { log.Printf("%s: cannot find generation stats: %s", r.Name(), err) continue diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 89c62c5..b729ed5 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -92,6 +92,15 @@ func DefaultConfig() Config { return Config{} } +func (c *Config) DBConfig(path string) *DBConfig { + for _, dbConfig := range c.DBs { + if dbConfig.Path == path { + return dbConfig + } + } + return nil +} + // ReadConfigFile unmarshals config from filename. Expands path if needed. func ReadConfigFile(filename string) (Config, error) { config := DefaultConfig() diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 9eee7dc..985a002 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -8,8 +8,9 @@ import ( "log" "os" "path/filepath" - "text/tabwriter" "time" + + "github.com/benbjohnson/litestream" ) type RestoreCommand struct { @@ -26,9 +27,9 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) registerConfigFlag(fs, &configPath) fs.StringVar(&opt.OutputPath, "o", "", "output path") - fs.StringVar(&opt.Replica, "replica", "", "replica name") + fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") fs.StringVar(&opt.Generation, "generation", "", "generation name") - fs.StringVar(&opt.DryRun, "dry-run", "", "dry run") + fs.BoolVar(&opt.DryRun, "dry-run", false, "dry run") timestampStr := fs.String("timestamp", "", "timestamp") verbose := fs.Bool("v", false, "verbose output") fs.Usage = c.Usage @@ -51,7 +52,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { // Parse timestamp, if specified. if *timestampStr != "" { - if opts.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil { + if opt.Timestamp, err = time.Parse(time.RFC3339, *timestampStr); err != nil { return errors.New("invalid -timestamp, must specify in ISO 8601 format (e.g. 2000-01-01T00:00:00Z)") } } @@ -81,7 +82,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { return err } - return db.Restore(opt) + return db.Restore(ctx, opt) } func (c *RestoreCommand) Usage() { @@ -95,7 +96,8 @@ Usage: Arguments: -config PATH - Specifies the configuration file. Defaults to %s + Specifies the configuration file. + Defaults to %s -replica NAME Restore from a specific replica. @@ -137,6 +139,7 @@ Examples: # Restore database from specific generation on S3. $ litestream restore -replica s3 -generation xxxxxxxx /path/to/db + `[1:], DefaultConfigPath, ) diff --git a/db.go b/db.go index 88a8e31..42f17d1 100644 --- a/db.go +++ b/db.go @@ -777,13 +777,13 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { return newSize, nil } -// WALReader opens a reader for a shadow WAL file at a given position. +// ShadowWALReader opens a reader for a shadow WAL file at a given position. // If the reader is at the end of the file, it attempts to return the next file. // // The caller should check Pos() & Size() on the returned reader to check offset. -func (db *DB) WALReader(pos Pos) (r *WALReader, err error) { +func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) { // Fetch reader for the requested position. Return if it has data. - r, err = db.walReader(pos) + r, err = db.shadowWALReader(pos) if err != nil { return nil, err } else if r.N() > 0 { @@ -793,15 +793,15 @@ func (db *DB) WALReader(pos Pos) (r *WALReader, err error) { // Otherwise attempt to read the start of the next WAL file. pos.Index, pos.Offset = pos.Index+1, 0 - r, err = db.walReader(pos) + r, err = db.shadowWALReader(pos) if os.IsNotExist(err) { return nil, io.EOF } return r, err } -// walReader opens a file reader for a shadow WAL file at a given position. -func (db *DB) walReader(pos Pos) (r *WALReader, err error) { +// shadowWALReader opens a file reader for a shadow WAL file at a given position. +func (db *DB) shadowWALReader(pos Pos) (r *ShadowWALReader, err error) { filename := db.ShadowWALPath(pos.Generation, pos.Index) f, err := os.Open(filename) @@ -832,7 +832,7 @@ func (db *DB) walReader(pos Pos) (r *WALReader, err error) { return nil, err } - return &WALReader{ + return &ShadowWALReader{ f: f, n: fileSize - pos.Offset, pos: pos, @@ -854,25 +854,25 @@ func frameAlign(offset int64, pageSize int) int64 { return (frameN * frameSize) + WALHeaderSize } -// WALReader represents a reader for a WAL file that tracks WAL position. -type WALReader struct { +// ShadowWALReader represents a reader for a shadow WAL file that tracks WAL position. +type ShadowWALReader struct { f *os.File n int64 pos Pos } // Close closes the underlying WAL file handle. -func (r *WALReader) Close() error { return r.f.Close() } +func (r *ShadowWALReader) Close() error { return r.f.Close() } // N returns the remaining bytes in the reader. -func (r *WALReader) N() int64 { return r.n } +func (r *ShadowWALReader) N() int64 { return r.n } // Pos returns the current WAL position. -func (r *WALReader) Pos() Pos { return r.pos } +func (r *ShadowWALReader) Pos() Pos { return r.pos } // Read reads bytes into p, updates the position, and returns the bytes read. // Returns io.EOF at the end of the available section of the WAL. -func (r *WALReader) Read(p []byte) (n int, err error) { +func (r *ShadowWALReader) Read(p []byte) (n int, err error) { if r.n <= 0 { return 0, io.EOF } @@ -1009,7 +1009,7 @@ func (db *DB) monitor() { // replica or generation or it will automatically choose the best one. Finally, // a timestamp can be specified to restore the database to a specific // point-in-time. -func (db *DB) Restore(opt RestoreOptions) { +func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { // Ensure logger exists. logger := opt.Logger if logger == nil { @@ -1019,7 +1019,7 @@ func (db *DB) Restore(opt RestoreOptions) { // Determine the correct output path. outputPath := opt.OutputPath if outputPath == "" { - outputPath = db.Path + outputPath = db.Path() } // Ensure output path does not already exist (unless this is a dry run). @@ -1027,65 +1027,56 @@ func (db *DB) Restore(opt RestoreOptions) { if _, err := os.Stat(outputPath); err == nil { return fmt.Errorf("cannot restore, output path already exists: %s", outputPath) } else if err != nil && !os.IsNotExist(err) { - return outputPath + return err } } // Determine target replica & generation to restore from. - r, generation, err := db.restoreTarget(opt, logger) + r, generation, err := db.restoreTarget(ctx, opt, logger) if err != nil { return err } - // Determine manifest to restore from. - snapshotPath, walPaths, err := opt.determineRestoreManifest(r, generation, opt.Timestamp, logger) + // Find lastest snapshot that occurs before timestamp. + minWALIndex, err := r.SnapshotIndexAt(ctx, generation, opt.Timestamp) if err != nil { - return err + return fmt.Errorf("cannot find snapshot index for restore: %w", err) } + // Find the maximum WAL index that occurs before timestamp. + maxWALIndex, err := r.WALIndexAt(ctx, generation, opt.Timestamp) + if err != nil { + return fmt.Errorf("cannot find max wal index for restore: %w", err) + } + + // Initialize starting position. + pos := Pos{Generation: generation, Index: minWALIndex} + tmpPath := outputPath + ".tmp" + // Copy snapshot to output path. - logger.Printf("restoring snapshot from %s://%s/%s to %s.tmp", r.Name(), generation, snapshotPath, outputPath) + logger.Printf("restoring snapshot from %s://%s/%s to %s", r.Name(), generation, minWALIndex, tmpPath) if !opt.DryRun { - if f, err := os.Create(outputPath + ".tmp"); err != nil { - return err - } else if err := r.RestoreSnapshot(f, snapshotPath); err != nil { - f.Close() - return err - } else if err := f.Sync(); err != nil { - f.Close() - return err - } else if err := f.Close(); err != nil { - return err + if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { + return fmt.Errorf("cannot restore snapshot: %w", err) } } - // Restore each WAL file. - for _, walPath := range walPaths { - logger.Printf("restoring wal from %s://%s/%s to %s.tmp-wal", r.Name(), generation, snapshotPath, outputPath) + // Restore each WAL file until we reach our maximum index. + for index := minWALIndex; index <= maxWALIndex; index++ { + logger.Printf("restoring wal from %s://%s/%016x to %s-wal", r.Name(), generation, index, tmpPath) if opt.DryRun { continue } - // Copy WAL from replica. - if f, err := os.Create(outputPath + ".tmp-wal"); err != nil { - return err - } else if err := r.RestoreWAL(f, walPath); err != nil { - f.Close() - return err - } else if err := f.Sync(); err != nil { - f.Close() - return err - } else if err := f.Close(); err != nil { - return err + if err := db.restoreWAL(ctx, r, generation, index, tmpPath); err != nil { + return fmt.Errorf("cannot restore wal: %w", err) } - - // TODO: Open database with SQLite and force a truncated checkpoint. } // Copy file to final location. logger.Printf("renaming database from temporary location") if !opt.DryRun { - if err := os.Rename(outputPath+".tmp", outputPath); err != nil { + if err := os.Rename(tmpPath, outputPath); err != nil { return err } } @@ -1093,7 +1084,7 @@ func (db *DB) Restore(opt RestoreOptions) { return nil } -func (db *DB) restoreTarget(opt RestoreOptions, logger *log.Logger) (Replicator, string, error) { +func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replicator, string, error) { var target struct { replicator Replicator generation string @@ -1106,15 +1097,20 @@ func (db *DB) restoreTarget(opt RestoreOptions, logger *log.Logger) (Replicator, continue } + generations, err := r.Generations(ctx) + if err != nil { + return nil, "", fmt.Errorf("cannot fetch generations: %w", err) + } + // Search generations for one that contains the requested timestamp. - for _, generation := range r.Generations() { + for _, generation := range generations { // Skip generation if it does not match filter. if opt.Generation != "" && generation != opt.Generation { continue } // Fetch stats for generation. - stats, err := r.GenerationStats(generation) + stats, err := r.GenerationStats(ctx, generation) if err != nil { return nil, "", fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err) } @@ -1145,6 +1141,72 @@ func (db *DB) restoreTarget(opt RestoreOptions, logger *log.Logger) (Replicator, return target.replicator, target.generation, nil } +// restoreSnapshot copies a snapshot from the replica to a file. +func (db *DB) restoreSnapshot(ctx context.Context, r Replicator, generation string, index int, filename string) error { + if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil { + return err + } + + f, err := os.Create(filename) + if err != nil { + return err + } + defer f.Close() + + rd, err := r.SnapshotReader(ctx, generation, index) + if err != nil { + return err + } + defer rd.Close() + + if _, err := io.Copy(f, rd); err != nil { + return err + } + + if err := f.Sync(); err != nil { + return err + } + return f.Close() +} + +// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. +func (db *DB) restoreWAL(ctx context.Context, r Replicator, generation string, index int, dbPath string) error { + // Open handle to destination WAL path. + f, err := os.Create(dbPath + "-wal") + if err != nil { + return err + } + defer f.Close() + + rd, err := r.WALReader(ctx, generation, index) + if err != nil { + return err + } + defer rd.Close() + + // + if _, err := io.Copy(f, rd); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + // Open SQLite database and force a truncating checkpoint. + d, err := sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + defer d.Close() + + if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil { + return err + } else if err := d.Close(); err != nil { + return err + } + + return nil +} + // RestoreOptions represents options for DB.Restore(). type RestoreOptions struct { // Target path to restore into. @@ -1163,8 +1225,12 @@ type RestoreOptions struct { // If zero, database restore to most recent state available. Timestamp time.Time + // If true, no actual restore is performed. + // Only equivalent log output for a regular restore. + DryRun bool + // Logger used to print status to. - Logger log.Logger + Logger *log.Logger } func headerByteOrder(hdr []byte) (binary.ByteOrder, error) { diff --git a/litestream.go b/litestream.go index ed9f3d7..c5fae8b 100644 --- a/litestream.go +++ b/litestream.go @@ -1,13 +1,16 @@ package litestream import ( + "compress/gzip" "database/sql" "encoding/binary" "encoding/hex" + "errors" "fmt" "io" "os" "path/filepath" + "regexp" "strconv" "strings" @@ -24,6 +27,11 @@ const ( GenerationNameLen = 16 ) +// Litestream errors. +var ( + ErrNoSnapshots = errors.New("no snapshots available") +) + // Pos is a position in the WAL for a generation. type Pos struct { Generation string // generation name @@ -153,19 +161,66 @@ func IsGenerationName(s string) bool { // IsSnapshotPath returns true if s is a path to a snapshot file. func IsSnapshotPath(s string) bool { - return strings.HasSuffix(s, SnapshotExt) || strings.HasSuffix(s, SnapshotExt+".gz") + return snapshotPathRegex.MatchString(s) } +// ParseSnapshotPath returns the index for the snapshot. +// Returns an error if the path is not a valid snapshot path. +func ParseSnapshotPath(s string) (index int, typ, ext string, err error) { + a := snapshotPathRegex.FindStringSubmatch(s) + if a == nil { + return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s) + } + + i64, _ := strconv.ParseUint(a[1], 16, 64) + return int(i64), a[2], a[3], nil +} + +var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:-(\w+))?(.snapshot(?:.gz)?)$`) + // IsWALPath returns true if s is a path to a WAL file. func IsWALPath(s string) bool { - return strings.HasSuffix(s, WALExt) || strings.HasSuffix(s, WALExt+".gz") + return walPathRegex.MatchString(s) } +// ParseWALPath returns the index & offset for the WAL file. +// Returns an error if the path is not a valid snapshot path. +func ParseWALPath(s string) (index int, offset int64, ext string, err error) { + a := walPathRegex.FindStringSubmatch(s) + if a == nil { + return 0, 0, "", fmt.Errorf("invalid wal path: %s", s) + } + + i64, _ := strconv.ParseUint(a[1], 16, 64) + off64, _ := strconv.ParseUint(a[2], 16, 64) + return int(i64), int64(off64), a[3], nil +} + +var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`) + // isHexChar returns true if ch is a lowercase hex character. func isHexChar(ch rune) bool { return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') } +// gzipReadCloser wraps gzip.Reader to also close the underlying reader on close. +type gzipReadCloser struct { + r *gzip.Reader + closer io.ReadCloser +} + +func (r *gzipReadCloser) Read(p []byte) (n int, err error) { + return r.r.Read(p) +} + +func (r *gzipReadCloser) Close() error { + if err := r.r.Close(); err != nil { + r.closer.Close() + return err + } + return r.closer.Close() +} + // HexDump returns hexdump output but with duplicate lines removed. func HexDump(b []byte) string { const prefixN = len("00000000") diff --git a/replicator.go b/replicator.go index 1357629..d10e236 100644 --- a/replicator.go +++ b/replicator.go @@ -24,18 +24,32 @@ type Replicator interface { // String identifier for the type of replicator ("file", "s3", etc). Type() string - // Returns a list of generation names for the replicator. - Generations() ([]string, error) - - // Returns basic information about a generation including the number of - // snapshot & WAL files as well as the time range covered. - GenerationStats(generation string) (GenerationStats, error) - // Starts replicating in a background goroutine. Start(ctx context.Context) // Stops all replication processing. Blocks until processing stopped. Stop() + + // Returns a list of generation names for the replicator. + Generations(ctx context.Context) ([]string, error) + + // Returns basic information about a generation including the number of + // snapshot & WAL files as well as the time range covered. + GenerationStats(ctx context.Context, generation string) (GenerationStats, error) + + // Returns the highest index for a snapshot within a generation that occurs + // before timestamp. If timestamp is zero, returns the latest snapshot. + SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) + + // Returns the highest index for a WAL file that occurs before timestamp. + // If timestamp is zero, returns the highest WAL index. + WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) + + // Returns a reader for snapshot data at the given generation/index. + SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) + + // Returns a reader for WAL data at the given position. + WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) } var _ Replicator = (*FileReplicator)(nil) @@ -76,18 +90,28 @@ func (r *FileReplicator) Type() string { return "file" } +// SnapshotDir returns the path to a generation's snapshot directory. +func (r *FileReplicator) SnapshotDir(generation string) string { + return filepath.Join(r.dst, "generations", generation, "snapshots") +} + // SnapshotPath returns the path to a snapshot file. func (r *FileReplicator) SnapshotPath(generation string, index int) string { - return filepath.Join(r.dst, "generations", generation, "snapshots", fmt.Sprintf("%016x.snapshot.gz", index)) + return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index)) +} + +// WALDir returns the path to a generation's WAL directory +func (r *FileReplicator) WALDir(generation string) string { + return filepath.Join(r.dst, "generations", generation, "wal") } // WALPath returns the path to a WAL file. func (r *FileReplicator) WALPath(generation string, index int) string { - return filepath.Join(r.dst, "generations", generation, "wal", fmt.Sprintf("%016x.wal", index)) + return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index)) } // Generations returns a list of available generation names. -func (r *FileReplicator) Generations() ([]string, error) { +func (r *FileReplicator) Generations(ctx context.Context) ([]string, error) { fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations")) if os.IsNotExist(err) { return nil, nil @@ -108,7 +132,7 @@ func (r *FileReplicator) Generations() ([]string, error) { } // GenerationStats returns stats for a generation. -func (r *FileReplicator) GenerationStats(generation string) (stats GenerationStats, err error) { +func (r *FileReplicator) GenerationStats(ctx context.Context, generation string) (stats GenerationStats, err error) { // Determine stats for all snapshots. n, min, max, err := r.snapshotStats(generation) if err != nil { @@ -136,7 +160,7 @@ func (r *FileReplicator) GenerationStats(generation string) (stats GenerationSta } func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time.Time, err error) { - fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "snapshots")) + fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) if os.IsNotExist(err) { return n, min, max, nil } else if err != nil { @@ -161,7 +185,7 @@ func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time. } func (r *FileReplicator) walStats(generation string) (n int, min, max time.Time, err error) { - fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "wal")) + fis, err := ioutil.ReadDir(r.WALDir(generation)) if os.IsNotExist(err) { return n, min, max, nil } else if err != nil { @@ -283,8 +307,8 @@ func (r *FileReplicator) pos() (pos Pos, err error) { pos.Generation = generation // Find the max WAL file. - walDir := filepath.Join(r.dst, "generations", generation, "wal") - fis, err := ioutil.ReadDir(walDir) + dir := r.WALDir(generation) + fis, err := ioutil.ReadDir(dir) if os.IsNotExist(err) { return pos, nil // no replicated wal, start at beginning of generation } else if err != nil { @@ -312,7 +336,7 @@ func (r *FileReplicator) pos() (pos Pos, err error) { pos.Index = index // Determine current offset. - fi, err := os.Stat(filepath.Join(walDir, FormatWALFilename(pos.Index))) + fi, err := os.Stat(filepath.Join(dir, FormatWALFilename(pos.Index))) if err != nil { return pos, err } @@ -348,7 +372,7 @@ func (r *FileReplicator) snapshot(ctx context.Context, generation string, index // snapshotN returns the number of snapshots for a generation. func (r *FileReplicator) snapshotN(generation string) (int, error) { - fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "snapshots")) + fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) if os.IsNotExist(err) { return 0, nil } else if err != nil { @@ -357,10 +381,7 @@ func (r *FileReplicator) snapshotN(generation string) (int, error) { var n int for _, fi := range fis { - name := fi.Name() - name = strings.TrimSuffix(name, ".gz") - - if strings.HasSuffix(name, SnapshotExt) { + if _, _, _, err := ParseSnapshotPath(fi.Name()); err == nil { n++ } } @@ -379,7 +400,7 @@ func (r *FileReplicator) sync(ctx context.Context, pos Pos) (_ Pos, err error) { } func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) { - rd, err := r.db.WALReader(pos) + rd, err := r.db.ShadowWALReader(pos) if err == io.EOF { return pos, err } else if err != nil { @@ -426,7 +447,7 @@ func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err erro // compress gzips all WAL files before the current one. func (r *FileReplicator) compress(ctx context.Context, generation string) error { - dir := filepath.Join(r.dst, "generations", generation, "wal") + dir := r.WALDir(generation) filenames, err := filepath.Glob(filepath.Join(dir, "*.wal")) if err != nil { return err @@ -457,6 +478,136 @@ func (r *FileReplicator) compress(ctx context.Context, generation string) error return nil } +// SnapsotIndexAt returns the highest index for a snapshot within a generation +// that occurs before timestamp. If timestamp is zero, returns the latest snapshot. +func (r *FileReplicator) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { + fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) + if os.IsNotExist(err) { + return 0, ErrNoSnapshots + } else if err != nil { + return 0, err + } + + index := -1 + var max time.Time + for _, fi := range fis { + // Read index from snapshot filename. + idx, _, _, err := ParseSnapshotPath(fi.Name()) + if err != nil { + continue // not a snapshot, skip + } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { + continue // after timestamp, skip + } + + // Use snapshot if it newer. + if max.IsZero() || fi.ModTime().After(max) { + index, max = idx, fi.ModTime() + } + } + + if index == -1 { + return 0, ErrNoSnapshots + } + return index, nil +} + +// Returns the highest index for a WAL file that occurs before timestamp. +// If timestamp is zero, returns the highest WAL index. +func (r *FileReplicator) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { + fis, err := ioutil.ReadDir(r.WALDir(generation)) + if os.IsNotExist(err) { + return 0, nil + } else if err != nil { + return 0, err + } + + index := -1 + for _, fi := range fis { + // Read index from snapshot filename. + idx, _, _, err := ParseWALPath(fi.Name()) + if err != nil { + continue // not a snapshot, skip + } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { + continue // after timestamp, skip + } else if idx < index { + continue // earlier index, skip + } + + index = idx + } + + if index == -1 { + return 0, nil + } + return index, nil +} + +// SnapshotReader returns a reader for snapshot data at the given generation/index. +// Returns os.ErrNotExist if no matching index is found. +func (r *FileReplicator) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { + dir := r.SnapshotDir(generation) + fis, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + for _, fi := range fis { + // Parse index from snapshot filename. Skip if no match. + idx, _, ext, err := ParseSnapshotPath(fi.Name()) + if err != nil || index != idx { + continue + } + + // Open & return the file handle if uncompressed. + f, err := os.Open(filepath.Join(dir, fi.Name())) + if err != nil { + return nil, err + } else if ext == ".snapshot" { + return f, nil // not compressed, return as-is. + } + assert(ext == ".snapshot.gz", "invalid snapshot extension") + + // If compressed, wrap in a gzip reader and return with wrapper to + // ensure that the underlying file is closed. + r, err := gzip.NewReader(f) + if err != nil { + f.Close() + return nil, err + } + return &gzipReadCloser{r: r, closer: f}, nil + } + return nil, os.ErrNotExist +} + +// WALReader returns a reader for WAL data at the given index. +// Returns os.ErrNotExist if no matching index is found. +func (r *FileReplicator) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { + filename := r.WALPath(generation, index) + + // Attempt to read uncompressed file first. + f, err := os.Open(filename) + if err == nil { + return f, nil // file exist, return + } else if err != nil && !os.IsNotExist(err) { + return nil, err + } + + // Otherwise read the compressed file. Return error if file doesn't exist. + f, err = os.Open(filename + ".gz") + if err != nil { + return nil, err + } + + // If compressed, wrap in a gzip reader and return with wrapper to + // ensure that the underlying file is closed. + rd, err := gzip.NewReader(f) + if err != nil { + f.Close() + return nil, err + } + return &gzipReadCloser{r: rd, closer: f}, nil +} + // compressFile compresses a file and replaces it with a new file with a .gz extension. func compressFile(src, dst string) error { r, err := os.Open(src)