Add generations command

This commit is contained in:
Ben Johnson
2020-12-26 09:39:21 -07:00
parent 22d44fda6d
commit 98014f4e49
7 changed files with 415 additions and 115 deletions

View File

@@ -12,14 +12,29 @@ import (
"sort"
"strings"
"sync"
"time"
)
// Replicator represents a method for replicating the snapshot & WAL data to
// a remote destination.
type Replicator interface {
// The name of the replicator. Defaults to type if no name specified.
Name() string
// String identifier for the type of replicator ("file", "s3", etc).
Type() string
// Returns a list of generation names for the replicator.
Generations() ([]string, error)
// Returns basic information about a generation including the number of
// snapshot & WAL files as well as the time range covered.
GenerationStats(generation string) (GenerationStats, error)
// Starts replicating in a background goroutine.
Start(ctx context.Context)
// Stops all replication processing. Blocks until processing stopped.
Stop()
}
@@ -71,6 +86,112 @@ func (r *FileReplicator) WALPath(generation string, index int) string {
return filepath.Join(r.dst, "generations", generation, "wal", fmt.Sprintf("%016x.wal", index))
}
// Generations returns a list of available generation names.
func (r *FileReplicator) Generations() ([]string, error) {
fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations"))
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
var generations []string
for _, fi := range fis {
if !IsGenerationName(fi.Name()) {
continue
} else if !fi.IsDir() {
continue
}
generations = append(generations, fi.Name())
}
return generations, nil
}
// GenerationStats returns stats for a generation.
func (r *FileReplicator) GenerationStats(generation string) (stats GenerationStats, err error) {
// Determine stats for all snapshots.
n, min, max, err := r.snapshotStats(generation)
if err != nil {
return stats, err
}
stats.SnapshotN = n
stats.CreatedAt, stats.UpdatedAt = min, max
// Update stats if we have WAL files.
n, min, max, err = r.walStats(generation)
if err != nil {
return stats, err
} else if n == 0 {
return stats, nil
}
stats.WALN = n
if stats.CreatedAt.IsZero() || min.Before(stats.CreatedAt) {
stats.CreatedAt = min
}
if stats.UpdatedAt.IsZero() || max.After(stats.UpdatedAt) {
stats.UpdatedAt = max
}
return stats, nil
}
func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time.Time, err error) {
fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "snapshots"))
if os.IsNotExist(err) {
return n, min, max, nil
} else if err != nil {
return n, min, max, err
}
for _, fi := range fis {
if !IsSnapshotPath(fi.Name()) {
continue
}
modTime := fi.ModTime().UTC()
n++
if min.IsZero() || modTime.Before(min) {
min = modTime
}
if max.IsZero() || modTime.After(max) {
max = modTime
}
}
return n, min, max, nil
}
func (r *FileReplicator) walStats(generation string) (n int, min, max time.Time, err error) {
fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "wal"))
if os.IsNotExist(err) {
return n, min, max, nil
} else if err != nil {
return n, min, max, err
}
for _, fi := range fis {
if !IsWALPath(fi.Name()) {
continue
}
modTime := fi.ModTime().UTC()
n++
if min.IsZero() || modTime.Before(min) {
min = modTime
}
if max.IsZero() || modTime.After(max) {
max = modTime
}
}
return n, min, max, nil
}
type GenerationStats struct {
SnapshotN int
WALN int
CreatedAt time.Time
UpdatedAt time.Time
}
// Start starts replication for a given generation.
func (r *FileReplicator) Start(ctx context.Context) {
// Stop previous replication.