Add retention policy, remove WAL subdir
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
|||||||
"os/user"
|
"os/user"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
@@ -157,7 +158,8 @@ type DBConfig struct {
|
|||||||
type ReplicaConfig struct {
|
type ReplicaConfig struct {
|
||||||
Type string `yaml:"type"` // "file", "s3"
|
Type string `yaml:"type"` // "file", "s3"
|
||||||
Name string `yaml:"name"` // name of replica, optional.
|
Name string `yaml:"name"` // name of replica, optional.
|
||||||
Path string `yaml:"path"` // used for file replicas
|
Path string `yaml:"path"`
|
||||||
|
Retention time.Duration `yaml:"retention"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerConfigFlag(fs *flag.FlagSet, p *string) {
|
func registerConfigFlag(fs *flag.FlagSet, p *string) {
|
||||||
@@ -196,5 +198,9 @@ func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litest
|
|||||||
if config.Path == "" {
|
if config.Path == "" {
|
||||||
return nil, fmt.Errorf("file replica path require for db %q", db.Path())
|
return nil, fmt.Errorf("file replica path require for db %q", db.Path())
|
||||||
}
|
}
|
||||||
return litestream.NewFileReplica(db, config.Name, config.Path), nil
|
r := litestream.NewFileReplica(db, config.Name, config.Path)
|
||||||
|
if v := config.Retention; v > 0 {
|
||||||
|
r.RetentionInterval = v
|
||||||
|
}
|
||||||
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|||||||
2
db.go
2
db.go
@@ -31,6 +31,7 @@ const (
|
|||||||
DefaultCheckpointInterval = 1 * time.Minute
|
DefaultCheckpointInterval = 1 * time.Minute
|
||||||
DefaultMinCheckpointPageN = 1000
|
DefaultMinCheckpointPageN = 1000
|
||||||
DefaultMaxCheckpointPageN = 10000
|
DefaultMaxCheckpointPageN = 10000
|
||||||
|
DefaultRetentionInterval = 24 * time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
// DB represents a managed instance of a SQLite database in the file system.
|
// DB represents a managed instance of a SQLite database in the file system.
|
||||||
@@ -41,6 +42,7 @@ type DB struct {
|
|||||||
rtx *sql.Tx // long running read transaction
|
rtx *sql.Tx // long running read transaction
|
||||||
pageSize int // page size, in bytes
|
pageSize int // page size, in bytes
|
||||||
notify chan struct{} // closes on WAL change
|
notify chan struct{} // closes on WAL change
|
||||||
|
|
||||||
uid, gid int // db user/group obtained on init
|
uid, gid int // db user/group obtained on init
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|||||||
@@ -52,6 +52,30 @@ type SnapshotInfo struct {
|
|||||||
CreatedAt time.Time
|
CreatedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// filterSnapshotsAfter returns all snapshots that were created on or after t.
|
||||||
|
func filterSnapshotsAfter(a []*SnapshotInfo, t time.Time) []*SnapshotInfo {
|
||||||
|
other := make([]*SnapshotInfo, 0, len(a))
|
||||||
|
for _, snapshot := range a {
|
||||||
|
if !snapshot.CreatedAt.Before(t) {
|
||||||
|
other = append(other, snapshot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return other
|
||||||
|
}
|
||||||
|
|
||||||
|
// findMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
|
||||||
|
func findMinSnapshotByGeneration(a []*SnapshotInfo, generation string) *SnapshotInfo {
|
||||||
|
var min *SnapshotInfo
|
||||||
|
for _, snapshot := range a {
|
||||||
|
if snapshot.Generation != generation {
|
||||||
|
continue
|
||||||
|
} else if min == nil || snapshot.Index < min.Index {
|
||||||
|
min = snapshot
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return min
|
||||||
|
}
|
||||||
|
|
||||||
// WALInfo represents file information about a WAL file.
|
// WALInfo represents file information about a WAL file.
|
||||||
type WALInfo struct {
|
type WALInfo struct {
|
||||||
Name string
|
Name string
|
||||||
@@ -184,19 +208,19 @@ func IsSnapshotPath(s string) bool {
|
|||||||
|
|
||||||
// ParseSnapshotPath returns the index for the snapshot.
|
// ParseSnapshotPath returns the index for the snapshot.
|
||||||
// Returns an error if the path is not a valid snapshot path.
|
// Returns an error if the path is not a valid snapshot path.
|
||||||
func ParseSnapshotPath(s string) (index int, typ, ext string, err error) {
|
func ParseSnapshotPath(s string) (index int, ext string, err error) {
|
||||||
s = filepath.Base(s)
|
s = filepath.Base(s)
|
||||||
|
|
||||||
a := snapshotPathRegex.FindStringSubmatch(s)
|
a := snapshotPathRegex.FindStringSubmatch(s)
|
||||||
if a == nil {
|
if a == nil {
|
||||||
return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s)
|
return 0, "", fmt.Errorf("invalid snapshot path: %s", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
i64, _ := strconv.ParseUint(a[1], 16, 64)
|
i64, _ := strconv.ParseUint(a[1], 16, 64)
|
||||||
return int(i64), a[2], a[3], nil
|
return int(i64), a[2], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:-(\w+))?(.snapshot(?:.gz)?)$`)
|
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(.snapshot(?:.gz)?)$`)
|
||||||
|
|
||||||
// IsWALPath returns true if s is a path to a WAL file.
|
// IsWALPath returns true if s is a path to a WAL file.
|
||||||
func IsWALPath(s string) bool {
|
func IsWALPath(s string) bool {
|
||||||
|
|||||||
256
replica.go
256
replica.go
@@ -11,7 +11,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -90,6 +89,10 @@ type FileReplica struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
|
|
||||||
|
// Time to keep snapshots and related WAL files.
|
||||||
|
// Database is snapshotted after interval and older WAL files are discarded.
|
||||||
|
RetentionInterval time.Duration
|
||||||
|
|
||||||
// If true, replica monitors database for changes automatically.
|
// If true, replica monitors database for changes automatically.
|
||||||
// Set to false if replica is being used synchronously (such as in tests).
|
// Set to false if replica is being used synchronously (such as in tests).
|
||||||
MonitorEnabled bool
|
MonitorEnabled bool
|
||||||
@@ -103,6 +106,7 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica {
|
|||||||
dst: dst,
|
dst: dst,
|
||||||
cancel: func() {},
|
cancel: func() {},
|
||||||
|
|
||||||
|
RetentionInterval: DefaultRetentionInterval,
|
||||||
MonitorEnabled: true,
|
MonitorEnabled: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -127,9 +131,14 @@ func (r *FileReplica) LastPos() Pos {
|
|||||||
return r.pos
|
return r.pos
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GenerationDir returns the path to a generation's root directory.
|
||||||
|
func (r *FileReplica) GenerationDir(generation string) string {
|
||||||
|
return filepath.Join(r.dst, "generations", generation)
|
||||||
|
}
|
||||||
|
|
||||||
// SnapshotDir returns the path to a generation's snapshot directory.
|
// SnapshotDir returns the path to a generation's snapshot directory.
|
||||||
func (r *FileReplica) SnapshotDir(generation string) string {
|
func (r *FileReplica) SnapshotDir(generation string) string {
|
||||||
return filepath.Join(r.dst, "generations", generation, "snapshots")
|
return filepath.Join(r.GenerationDir(generation), "snapshots")
|
||||||
}
|
}
|
||||||
|
|
||||||
// SnapshotPath returns the path to a snapshot file.
|
// SnapshotPath returns the path to a snapshot file.
|
||||||
@@ -146,7 +155,7 @@ func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) {
|
|||||||
|
|
||||||
index := -1
|
index := -1
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
if idx, _, _, err := ParseSnapshotPath(fi.Name()); err != nil {
|
if idx, _, err := ParseSnapshotPath(fi.Name()); err != nil {
|
||||||
continue
|
continue
|
||||||
} else if index == -1 || idx > index {
|
} else if index == -1 || idx > index {
|
||||||
index = idx
|
index = idx
|
||||||
@@ -160,55 +169,12 @@ func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) {
|
|||||||
|
|
||||||
// WALDir returns the path to a generation's WAL directory
|
// WALDir returns the path to a generation's WAL directory
|
||||||
func (r *FileReplica) WALDir(generation string) string {
|
func (r *FileReplica) WALDir(generation string) string {
|
||||||
return filepath.Join(r.dst, "generations", generation, "wal")
|
return filepath.Join(r.GenerationDir(generation), "wal")
|
||||||
}
|
|
||||||
|
|
||||||
// WALSubdir returns the directory used for grouping WAL files.
|
|
||||||
func (r *FileReplica) WALSubdir(generation string, index int) string {
|
|
||||||
return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x", uint64(index)&walDirMask))
|
|
||||||
}
|
|
||||||
|
|
||||||
// WALSubdirNames returns a list of all WAL subdirectory group names.
|
|
||||||
func (r *FileReplica) WALSubdirNames(generation string) ([]string, error) {
|
|
||||||
fis, err := ioutil.ReadDir(r.WALDir(generation))
|
|
||||||
if err != nil && !os.IsNotExist(err) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var names []string
|
|
||||||
for _, fi := range fis {
|
|
||||||
if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
names = append(names, fi.Name())
|
|
||||||
}
|
|
||||||
return names, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MaxWALSubdirName returns the highest WAL subdirectory group name.
|
|
||||||
func (r *FileReplica) MaxWALSubdirName(generation string) (string, error) {
|
|
||||||
fis, err := ioutil.ReadDir(r.WALDir(generation))
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
var name string
|
|
||||||
for _, fi := range fis {
|
|
||||||
if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil {
|
|
||||||
continue
|
|
||||||
} else if name == "" || fi.Name() > name {
|
|
||||||
name = fi.Name()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if name == "" {
|
|
||||||
return "", os.ErrNotExist
|
|
||||||
}
|
|
||||||
return name, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WALPath returns the path to a WAL file.
|
// WALPath returns the path to a WAL file.
|
||||||
func (r *FileReplica) WALPath(generation string, index int) string {
|
func (r *FileReplica) WALPath(generation string, index int) string {
|
||||||
return filepath.Join(r.WALSubdir(generation, index), fmt.Sprintf("%016x.wal", index))
|
return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generations returns a list of available generation names.
|
// Generations returns a list of available generation names.
|
||||||
@@ -286,13 +252,7 @@ func (r *FileReplica) snapshotStats(generation string) (n int, min, max time.Tim
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) {
|
func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) {
|
||||||
names, err := r.WALSubdirNames(generation)
|
fis, err := ioutil.ReadDir(r.WALDir(generation))
|
||||||
if err != nil {
|
|
||||||
return n, min, max, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, name := range names {
|
|
||||||
fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name))
|
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return n, min, max, nil
|
return n, min, max, nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@@ -313,7 +273,6 @@ func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, er
|
|||||||
max = modTime
|
max = modTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return n, min, max, nil
|
return n, min, max, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -334,7 +293,7 @@ func (r *FileReplica) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
index, _, _, err := ParseSnapshotPath(fi.Name())
|
index, _, err := ParseSnapshotPath(fi.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -364,23 +323,8 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
|
|||||||
|
|
||||||
var infos []*WALInfo
|
var infos []*WALInfo
|
||||||
for _, generation := range generations {
|
for _, generation := range generations {
|
||||||
// Find a list of all directory groups.
|
// Find a list of all WAL files.
|
||||||
dir := r.WALDir(generation)
|
fis, err := ioutil.ReadDir(r.WALDir(generation))
|
||||||
subfis, err := ioutil.ReadDir(dir)
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
continue
|
|
||||||
} else if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate over WAL group subdirectories.
|
|
||||||
for _, subfi := range subfis {
|
|
||||||
if !subfi.IsDir() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find a list of all WAL files in the group.
|
|
||||||
fis, err := ioutil.ReadDir(filepath.Join(dir, subfi.Name()))
|
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@@ -405,7 +349,6 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return infos, nil
|
return infos, nil
|
||||||
}
|
}
|
||||||
@@ -424,8 +367,9 @@ func (r *FileReplica) Start(ctx context.Context) {
|
|||||||
ctx, r.cancel = context.WithCancel(ctx)
|
ctx, r.cancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
// Start goroutine to replicate data.
|
// Start goroutine to replicate data.
|
||||||
r.wg.Add(1)
|
r.wg.Add(2)
|
||||||
go func() { defer r.wg.Done(); r.monitor(ctx) }()
|
go func() { defer r.wg.Done(); r.monitor(ctx) }()
|
||||||
|
go func() { defer r.wg.Done(); r.retainer(ctx) }()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop cancels any outstanding replication and blocks until finished.
|
// Stop cancels any outstanding replication and blocks until finished.
|
||||||
@@ -464,6 +408,24 @@ func (r *FileReplica) monitor(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retainer runs in a separate goroutine and handles retention.
|
||||||
|
func (r *FileReplica) retainer(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(1 * time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := r.EnforceRetention(ctx); err != nil {
|
||||||
|
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// CalcPos returns the position for the replica for the current generation.
|
// CalcPos returns the position for the replica for the current generation.
|
||||||
// Returns a zero value if there is no active generation.
|
// Returns a zero value if there is no active generation.
|
||||||
func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) {
|
func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) {
|
||||||
@@ -474,16 +436,8 @@ func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) {
|
|||||||
return Pos{}, err
|
return Pos{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find highest WAL subdirectory group.
|
// Find the max WAL file within WAL.
|
||||||
subdir, err := r.MaxWALSubdirName(generation)
|
fis, err := ioutil.ReadDir(r.WALDir(generation))
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return pos, nil // no replicated wal, start at snapshot index
|
|
||||||
} else if err != nil {
|
|
||||||
return Pos{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the max WAL file within WAL group.
|
|
||||||
fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), subdir))
|
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return pos, nil // no replicated wal, start at snapshot index.
|
return pos, nil // no replicated wal, start at snapshot index.
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@@ -549,7 +503,7 @@ func (r *FileReplica) snapshotN(generation string) (int, error) {
|
|||||||
|
|
||||||
var n int
|
var n int
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
if _, _, _, err := ParseSnapshotPath(fi.Name()); err == nil {
|
if _, _, err := ParseSnapshotPath(fi.Name()); err == nil {
|
||||||
n++
|
n++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -693,7 +647,7 @@ func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, ti
|
|||||||
var max time.Time
|
var max time.Time
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
// Read index from snapshot filename.
|
// Read index from snapshot filename.
|
||||||
idx, _, _, err := ParseSnapshotPath(fi.Name())
|
idx, _, err := ParseSnapshotPath(fi.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue // not a snapshot, skip
|
continue // not a snapshot, skip
|
||||||
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
|
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
|
||||||
@@ -715,17 +669,8 @@ func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, ti
|
|||||||
// Returns the highest index for a WAL file that occurs before maxIndex & timestamp.
|
// Returns the highest index for a WAL file that occurs before maxIndex & timestamp.
|
||||||
// If timestamp is zero, returns the highest WAL index.
|
// If timestamp is zero, returns the highest WAL index.
|
||||||
func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) {
|
func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxIndex int, timestamp time.Time) (int, error) {
|
||||||
names, err := r.WALSubdirNames(generation)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Optimize to only read the last group if no timestamp specified.
|
|
||||||
// TODO: Perform binary search to find correct timestamp.
|
|
||||||
|
|
||||||
var index int
|
var index int
|
||||||
for _, name := range names {
|
fis, err := ioutil.ReadDir(r.WALDir(generation))
|
||||||
fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name))
|
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@@ -747,7 +692,6 @@ func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, maxInde
|
|||||||
|
|
||||||
index = idx
|
index = idx
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// If max index is specified but not found, return an error.
|
// If max index is specified but not found, return an error.
|
||||||
if maxIndex != math.MaxInt64 && index != maxIndex {
|
if maxIndex != math.MaxInt64 && index != maxIndex {
|
||||||
@@ -768,7 +712,7 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind
|
|||||||
|
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
// Parse index from snapshot filename. Skip if no match.
|
// Parse index from snapshot filename. Skip if no match.
|
||||||
idx, _, ext, err := ParseSnapshotPath(fi.Name())
|
idx, ext, err := ParseSnapshotPath(fi.Name())
|
||||||
if err != nil || index != idx {
|
if err != nil || index != idx {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -823,6 +767,118 @@ func (r *FileReplica) WALReader(ctx context.Context, generation string, index in
|
|||||||
return &gzipReadCloser{r: rd, closer: f}, nil
|
return &gzipReadCloser{r: rd, closer: f}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EnforceRetention forces a new snapshot once the retention interval has passed.
|
||||||
|
// Older snapshots and WAL files are then removed.
|
||||||
|
func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
|
||||||
|
// Find current position of database.
|
||||||
|
pos, err := r.db.Pos()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot determine current generation: %w", err)
|
||||||
|
} else if pos.IsZero() {
|
||||||
|
return fmt.Errorf("no generation, waiting for data")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain list of snapshots that are within the retention period.
|
||||||
|
snapshots, err := r.Snapshots(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain snapshot list: %w", err)
|
||||||
|
}
|
||||||
|
snapshots = filterSnapshotsAfter(snapshots, time.Now().Add(-r.RetentionInterval))
|
||||||
|
|
||||||
|
// If no retained snapshots exist, create a new snapshot.
|
||||||
|
if len(snapshots) == 0 {
|
||||||
|
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
|
||||||
|
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
|
||||||
|
return fmt.Errorf("cannot snapshot: %w", err)
|
||||||
|
}
|
||||||
|
snapshots = append(snapshots, &SnapshotInfo{Generation: pos.Generation, Index: pos.Index})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop over generations and delete unretained snapshots & WAL files.
|
||||||
|
generations, err := r.Generations(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain generations: %w", err)
|
||||||
|
}
|
||||||
|
for _, generation := range generations {
|
||||||
|
// Find earliest retained snapshot for this generation.
|
||||||
|
snapshot := findMinSnapshotByGeneration(snapshots, generation)
|
||||||
|
|
||||||
|
// Delete generations if it has no snapshots being retained.
|
||||||
|
if snapshot == nil {
|
||||||
|
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
|
||||||
|
if err := os.RemoveAll(r.GenerationDir(generation)); err != nil {
|
||||||
|
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise delete all snapshots & WAL files before a lowest retained index.
|
||||||
|
if err := r.deleteGenerationSnapshotsBefore(ctx, generation, snapshot.Index); err != nil {
|
||||||
|
return fmt.Errorf("cannot delete generation %q snapshots before index %d: %w", generation, snapshot.Index, err)
|
||||||
|
} else if err := r.deleteGenerationWALBefore(ctx, generation, snapshot.Index); err != nil {
|
||||||
|
return fmt.Errorf("cannot delete generation %q wal before index %d: %w", generation, snapshot.Index, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteGenerationSnapshotsBefore deletes snapshot before a given index.
|
||||||
|
func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, generation string, index int) (err error) {
|
||||||
|
dir := r.SnapshotDir(generation)
|
||||||
|
|
||||||
|
fis, err := ioutil.ReadDir(dir)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fi := range fis {
|
||||||
|
idx, _, err := ParseSnapshotPath(fi.Name())
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
} else if idx >= index {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("%s(%s): generation %q snapshot no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
|
||||||
|
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteGenerationWALBefore deletes WAL files before a given index.
|
||||||
|
func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation string, index int) (err error) {
|
||||||
|
dir := r.WALDir(generation)
|
||||||
|
|
||||||
|
fis, err := ioutil.ReadDir(dir)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fi := range fis {
|
||||||
|
idx, _, _, err := ParseWALPath(fi.Name())
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
} else if idx >= index {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
|
||||||
|
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// compressFile compresses a file and replaces it with a new file with a .gz extension.
|
// compressFile compresses a file and replaces it with a new file with a .gz extension.
|
||||||
func compressFile(src, dst string, uid, gid int) error {
|
func compressFile(src, dst string, uid, gid int) error {
|
||||||
r, err := os.Open(src)
|
r, err := os.Open(src)
|
||||||
|
|||||||
Reference in New Issue
Block a user