diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 6ff0d1e..520e778 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -10,6 +10,7 @@ import ( "os/user" "path/filepath" "strings" + "time" "github.com/benbjohnson/litestream" "gopkg.in/yaml.v2" @@ -155,9 +156,10 @@ type DBConfig struct { } type ReplicaConfig struct { - Type string `yaml:"type"` // "file", "s3" - Name string `yaml:"name"` // name of replica, optional. - Path string `yaml:"path"` // used for file replicas + Type string `yaml:"type"` // "file", "s3" + Name string `yaml:"name"` // name of replica, optional. + Path string `yaml:"path"` + Retention time.Duration `yaml:"retention"` } func registerConfigFlag(fs *flag.FlagSet, p *string) { @@ -196,5 +198,9 @@ func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litest if config.Path == "" { return nil, fmt.Errorf("file replica path require for db %q", db.Path()) } - return litestream.NewFileReplica(db, config.Name, config.Path), nil + r := litestream.NewFileReplica(db, config.Name, config.Path) + if v := config.Retention; v > 0 { + r.RetentionInterval = v + } + return r, nil } diff --git a/db.go b/db.go index d9a8e82..7d982bb 100644 --- a/db.go +++ b/db.go @@ -31,6 +31,7 @@ const ( DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 + DefaultRetentionInterval = 24 * time.Hour ) // DB represents a managed instance of a SQLite database in the file system. @@ -41,7 +42,8 @@ type DB struct { rtx *sql.Tx // long running read transaction pageSize int // page size, in bytes notify chan struct{} // closes on WAL change - uid, gid int // db user/group obtained on init + + uid, gid int // db user/group obtained on init ctx context.Context cancel func() diff --git a/litestream.go b/litestream.go index 92e19f3..1e0537f 100644 --- a/litestream.go +++ b/litestream.go @@ -52,6 +52,30 @@ type SnapshotInfo struct { CreatedAt time.Time } +// filterSnapshotsAfter returns all snapshots that were created on or after t. +func filterSnapshotsAfter(a []*SnapshotInfo, t time.Time) []*SnapshotInfo { + other := make([]*SnapshotInfo, 0, len(a)) + for _, snapshot := range a { + if !snapshot.CreatedAt.Before(t) { + other = append(other, snapshot) + } + } + return other +} + +// findMinSnapshotByGeneration finds the snapshot with the lowest index in a generation. +func findMinSnapshotByGeneration(a []*SnapshotInfo, generation string) *SnapshotInfo { + var min *SnapshotInfo + for _, snapshot := range a { + if snapshot.Generation != generation { + continue + } else if min == nil || snapshot.Index < min.Index { + min = snapshot + } + } + return min +} + // WALInfo represents file information about a WAL file. type WALInfo struct { Name string @@ -184,19 +208,19 @@ func IsSnapshotPath(s string) bool { // ParseSnapshotPath returns the index for the snapshot. // Returns an error if the path is not a valid snapshot path. -func ParseSnapshotPath(s string) (index int, typ, ext string, err error) { +func ParseSnapshotPath(s string) (index int, ext string, err error) { s = filepath.Base(s) a := snapshotPathRegex.FindStringSubmatch(s) if a == nil { - return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s) + return 0, "", fmt.Errorf("invalid snapshot path: %s", s) } i64, _ := strconv.ParseUint(a[1], 16, 64) - return int(i64), a[2], a[3], nil + return int(i64), a[2], nil } -var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:-(\w+))?(.snapshot(?:.gz)?)$`) +var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(.snapshot(?:.gz)?)$`) // IsWALPath returns true if s is a path to a WAL file. func IsWALPath(s string) bool { diff --git a/replica.go b/replica.go index 3d3e4a9..df3ca99 100644 --- a/replica.go +++ b/replica.go @@ -11,7 +11,6 @@ import ( "os" "path/filepath" "sort" - "strconv" "sync" "time" ) @@ -90,6 +89,10 @@ type FileReplica struct { ctx context.Context cancel func() + // Time to keep snapshots and related WAL files. + // Database is snapshotted after interval and older WAL files are discarded. + RetentionInterval time.Duration + // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool @@ -103,7 +106,8 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica { dst: dst, cancel: func() {}, - MonitorEnabled: true, + RetentionInterval: DefaultRetentionInterval, + MonitorEnabled: true, } } @@ -127,9 +131,14 @@ func (r *FileReplica) LastPos() Pos { return r.pos } +// GenerationDir returns the path to a generation's root directory. +func (r *FileReplica) GenerationDir(generation string) string { + return filepath.Join(r.dst, "generations", generation) +} + // SnapshotDir returns the path to a generation's snapshot directory. func (r *FileReplica) SnapshotDir(generation string) string { - return filepath.Join(r.dst, "generations", generation, "snapshots") + return filepath.Join(r.GenerationDir(generation), "snapshots") } // SnapshotPath returns the path to a snapshot file. @@ -146,7 +155,7 @@ func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) { index := -1 for _, fi := range fis { - if idx, _, _, err := ParseSnapshotPath(fi.Name()); err != nil { + if idx, _, err := ParseSnapshotPath(fi.Name()); err != nil { continue } else if index == -1 || idx > index { index = idx @@ -160,55 +169,12 @@ func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) { // WALDir returns the path to a generation's WAL directory func (r *FileReplica) WALDir(generation string) string { - return filepath.Join(r.dst, "generations", generation, "wal") -} - -// WALSubdir returns the directory used for grouping WAL files. -func (r *FileReplica) WALSubdir(generation string, index int) string { - return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x", uint64(index)&walDirMask)) -} - -// WALSubdirNames returns a list of all WAL subdirectory group names. -func (r *FileReplica) WALSubdirNames(generation string) ([]string, error) { - fis, err := ioutil.ReadDir(r.WALDir(generation)) - if err != nil && !os.IsNotExist(err) { - return nil, err - } - - var names []string - for _, fi := range fis { - if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil { - continue - } - names = append(names, fi.Name()) - } - return names, nil -} - -// MaxWALSubdirName returns the highest WAL subdirectory group name. -func (r *FileReplica) MaxWALSubdirName(generation string) (string, error) { - fis, err := ioutil.ReadDir(r.WALDir(generation)) - if err != nil { - return "", err - } - - var name string - for _, fi := range fis { - if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil { - continue - } else if name == "" || fi.Name() > name { - name = fi.Name() - } - } - if name == "" { - return "", os.ErrNotExist - } - return name, nil + return filepath.Join(r.GenerationDir(generation), "wal") } // WALPath returns the path to a WAL file. func (r *FileReplica) WALPath(generation string, index int) string { - return filepath.Join(r.WALSubdir(generation, index), fmt.Sprintf("%016x.wal", index)) + return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index)) } // Generations returns a list of available generation names. @@ -286,32 +252,25 @@ func (r *FileReplica) snapshotStats(generation string) (n int, min, max time.Tim } func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) { - names, err := r.WALSubdirNames(generation) - if err != nil { + fis, err := ioutil.ReadDir(r.WALDir(generation)) + if os.IsNotExist(err) { + return n, min, max, nil + } else if err != nil { return n, min, max, err } - for _, name := range names { - fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name)) - if os.IsNotExist(err) { - return n, min, max, nil - } else if err != nil { - return n, min, max, err + for _, fi := range fis { + if !IsWALPath(fi.Name()) { + continue } + modTime := fi.ModTime().UTC() - for _, fi := range fis { - if !IsWALPath(fi.Name()) { - continue - } - modTime := fi.ModTime().UTC() - - n++ - if min.IsZero() || modTime.Before(min) { - min = modTime - } - if max.IsZero() || modTime.After(max) { - max = modTime - } + n++ + if min.IsZero() || modTime.Before(min) { + min = modTime + } + if max.IsZero() || modTime.After(max) { + max = modTime } } return n, min, max, nil @@ -334,7 +293,7 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) { } for _, fi := range fis { - index, _, _, err := ParseSnapshotPath(fi.Name()) + index, _, err := ParseSnapshotPath(fi.Name()) if err != nil { continue } @@ -364,46 +323,30 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) { var infos []*WALInfo for _, generation := range generations { - // Find a list of all directory groups. - dir := r.WALDir(generation) - subfis, err := ioutil.ReadDir(dir) + // Find a list of all WAL files. + fis, err := ioutil.ReadDir(r.WALDir(generation)) if os.IsNotExist(err) { continue } else if err != nil { return nil, err } - // Iterate over WAL group subdirectories. - for _, subfi := range subfis { - if !subfi.IsDir() { + // Iterate over each WAL file. + for _, fi := range fis { + index, offset, _, err := ParseWALPath(fi.Name()) + if err != nil { continue } - // Find a list of all WAL files in the group. - fis, err := ioutil.ReadDir(filepath.Join(dir, subfi.Name())) - if os.IsNotExist(err) { - continue - } else if err != nil { - return nil, err - } - - // Iterate over each WAL file. - for _, fi := range fis { - index, offset, _, err := ParseWALPath(fi.Name()) - if err != nil { - continue - } - - infos = append(infos, &WALInfo{ - Name: fi.Name(), - Replica: r.Name(), - Generation: generation, - Index: index, - Offset: offset, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), - }) - } + infos = append(infos, &WALInfo{ + Name: fi.Name(), + Replica: r.Name(), + Generation: generation, + Index: index, + Offset: offset, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + }) } } @@ -424,8 +367,9 @@ func (r *FileReplica) Start(ctx context.Context) { ctx, r.cancel = context.WithCancel(ctx) // Start goroutine to replicate data. - r.wg.Add(1) + r.wg.Add(2) go func() { defer r.wg.Done(); r.monitor(ctx) }() + go func() { defer r.wg.Done(); r.retainer(ctx) }() } // Stop cancels any outstanding replication and blocks until finished. @@ -464,6 +408,24 @@ func (r *FileReplica) monitor(ctx context.Context) { } } +// retainer runs in a separate goroutine and handles retention. +func (r *FileReplica) retainer(ctx context.Context) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.EnforceRetention(ctx); err != nil { + log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err) + continue + } + } + } +} + // CalcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) { @@ -474,16 +436,8 @@ func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) { return Pos{}, err } - // Find highest WAL subdirectory group. - subdir, err := r.MaxWALSubdirName(generation) - if os.IsNotExist(err) { - return pos, nil // no replicated wal, start at snapshot index - } else if err != nil { - return Pos{}, err - } - - // Find the max WAL file within WAL group. - fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), subdir)) + // Find the max WAL file within WAL. + fis, err := ioutil.ReadDir(r.WALDir(generation)) if os.IsNotExist(err) { return pos, nil // no replicated wal, start at snapshot index. } else if err != nil { @@ -549,7 +503,7 @@ func (r *FileReplica) snapshotN(generation string) (int, error) { var n int for _, fi := range fis { - if _, _, _, err := ParseSnapshotPath(fi.Name()); err == nil { + if _, _, err := ParseSnapshotPath(fi.Name()); err == nil { n++ } } @@ -693,7 +647,7 @@ func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, ti var max time.Time for _, fi := range fis { // Read index from snapshot filename. - idx, _, _, err := ParseSnapshotPath(fi.Name()) + idx, _, err := ParseSnapshotPath(fi.Name()) if err != nil { continue // not a snapshot, skip } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { @@ -715,38 +669,28 @@ func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, ti // Returns the highest index for a WAL file that occurs before maxIndex & timestamp. // If timestamp is zero, returns the highest WAL index. func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) { - names, err := r.WALSubdirNames(generation) - if err != nil { + var index int + fis, err := ioutil.ReadDir(r.WALDir(generation)) + if os.IsNotExist(err) { + return 0, nil + } else if err != nil { return 0, err } - // TODO: Optimize to only read the last group if no timestamp specified. - // TODO: Perform binary search to find correct timestamp. - - var index int - for _, name := range names { - fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name)) - if os.IsNotExist(err) { - return 0, nil - } else if err != nil { - return 0, err + for _, fi := range fis { + // Read index from snapshot filename. + idx, _, _, err := ParseWALPath(fi.Name()) + if err != nil { + continue // not a snapshot, skip + } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { + continue // after timestamp, skip + } else if idx > maxIndex { + continue // after timestamp, skip + } else if idx < index { + continue // earlier index, skip } - for _, fi := range fis { - // Read index from snapshot filename. - idx, _, _, err := ParseWALPath(fi.Name()) - if err != nil { - continue // not a snapshot, skip - } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { - continue // after timestamp, skip - } else if idx > maxIndex { - continue // after timestamp, skip - } else if idx < index { - continue // earlier index, skip - } - - index = idx - } + index = idx } // If max index is specified but not found, return an error. @@ -768,7 +712,7 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind for _, fi := range fis { // Parse index from snapshot filename. Skip if no match. - idx, _, ext, err := ParseSnapshotPath(fi.Name()) + idx, ext, err := ParseSnapshotPath(fi.Name()) if err != nil || index != idx { continue } @@ -823,6 +767,118 @@ func (r *FileReplica) WALReader(ctx context.Context, generation string, index in return &gzipReadCloser{r: rd, closer: f}, nil } +// EnforceRetention forces a new snapshot once the retention interval has passed. +// Older snapshots and WAL files are then removed. +func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) { + // Find current position of database. + pos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current generation: %w", err) + } else if pos.IsZero() { + return fmt.Errorf("no generation, waiting for data") + } + + // Obtain list of snapshots that are within the retention period. + snapshots, err := r.Snapshots(ctx) + if err != nil { + return fmt.Errorf("cannot obtain snapshot list: %w", err) + } + snapshots = filterSnapshotsAfter(snapshots, time.Now().Add(-r.RetentionInterval)) + + // If no retained snapshots exist, create a new snapshot. + if len(snapshots) == 0 { + log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name()) + if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil { + return fmt.Errorf("cannot snapshot: %w", err) + } + snapshots = append(snapshots, &SnapshotInfo{Generation: pos.Generation, Index: pos.Index}) + } + + // Loop over generations and delete unretained snapshots & WAL files. + generations, err := r.Generations(ctx) + if err != nil { + return fmt.Errorf("cannot obtain generations: %w", err) + } + for _, generation := range generations { + // Find earliest retained snapshot for this generation. + snapshot := findMinSnapshotByGeneration(snapshots, generation) + + // Delete generations if it has no snapshots being retained. + if snapshot == nil { + log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation) + if err := os.RemoveAll(r.GenerationDir(generation)); err != nil { + return fmt.Errorf("cannot delete generation %q dir: %w", generation, err) + } + continue + } + + // Otherwise delete all snapshots & WAL files before a lowest retained index. + if err := r.deleteGenerationSnapshotsBefore(ctx, generation, snapshot.Index); err != nil { + return fmt.Errorf("cannot delete generation %q snapshots before index %d: %w", generation, snapshot.Index, err) + } else if err := r.deleteGenerationWALBefore(ctx, generation, snapshot.Index); err != nil { + return fmt.Errorf("cannot delete generation %q wal before index %d: %w", generation, snapshot.Index, err) + } + } + + return nil +} + +// deleteGenerationSnapshotsBefore deletes snapshot before a given index. +func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, generation string, index int) (err error) { + dir := r.SnapshotDir(generation) + + fis, err := ioutil.ReadDir(dir) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + + for _, fi := range fis { + idx, _, err := ParseSnapshotPath(fi.Name()) + if err != nil { + continue + } else if idx >= index { + continue + } + + log.Printf("%s(%s): generation %q snapshot no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name()) + if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { + return err + } + } + + return nil +} + +// deleteGenerationWALBefore deletes WAL files before a given index. +func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation string, index int) (err error) { + dir := r.WALDir(generation) + + fis, err := ioutil.ReadDir(dir) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + + for _, fi := range fis { + idx, _, _, err := ParseWALPath(fi.Name()) + if err != nil { + continue + } else if idx >= index { + continue + } + + log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name()) + if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { + return err + } + } + + return nil +} + // compressFile compresses a file and replaces it with a new file with a .gz extension. func compressFile(src, dst string, uid, gid int) error { r, err := os.Open(src)