Clean shadow WAL

This commit is contained in:
Ben Johnson
2020-12-30 14:48:58 -07:00
parent 0b12efb135
commit 5cc78fafa0
2 changed files with 197 additions and 76 deletions

View File

@@ -36,6 +36,9 @@ type Replica interface {
// snapshot & WAL files as well as the time range covered.
GenerationStats(ctx context.Context, generation string) (GenerationStats, error)
// Returns the last replication position.
Pos() Pos
// Returns the highest index for a snapshot within a generation that occurs
// before timestamp. If timestamp is zero, returns the latest snapshot.
SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
@@ -70,9 +73,10 @@ type FileReplica struct {
name string // replica name, optional
dst string // destination path
// mu sync.RWMutex
wg sync.WaitGroup
mu sync.RWMutex
pos Pos // last position
wg sync.WaitGroup
ctx context.Context
cancel func()
}
@@ -100,6 +104,13 @@ func (r *FileReplica) Type() string {
return "file"
}
// Pos returns the last successfully replicated position.
func (r *FileReplica) Pos() Pos {
r.mu.RLock()
defer r.mu.RUnlock()
return r.pos
}
// SnapshotDir returns the path to a generation's snapshot directory.
func (r *FileReplica) SnapshotDir(generation string) string {
return filepath.Join(r.dst, "generations", generation, "snapshots")
@@ -110,6 +121,27 @@ func (r *FileReplica) SnapshotPath(generation string, index int) string {
return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index))
}
// MaxSnapshotIndex returns the highest index for the snapshots.
func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) {
fis, err := ioutil.ReadDir(r.SnapshotDir(generation))
if err != nil {
return 0, err
}
index := -1
for _, fi := range fis {
if idx, _, _, err := ParseSnapshotPath(fi.Name()); err != nil {
continue
} else if index == -1 || idx > index {
index = idx
}
}
if index == -1 {
return 0, fmt.Errorf("no snapshots found")
}
return index, nil
}
// WALDir returns the path to a generation's WAL directory
func (r *FileReplica) WALDir(generation string) string {
return filepath.Join(r.dst, "generations", generation, "wal")
@@ -300,8 +332,6 @@ func (r *FileReplica) monitor(ctx context.Context) {
close(ch)
var notify <-chan struct{} = ch
var pos Pos
var err error
for {
select {
case <-ctx.Done():
@@ -312,67 +342,38 @@ func (r *FileReplica) monitor(ctx context.Context) {
// Fetch new notify channel before replicating data.
notify = r.db.Notify()
// Determine position, if necessary.
if pos.IsZero() {
if pos, err = r.pos(); err != nil {
log.Printf("%s(%s): cannot determine position: %s", r.db.Path(), r.Name(), err)
continue
} else if pos.IsZero() {
log.Printf("%s(%s): no generation, waiting for data", r.db.Path(), r.Name())
continue
}
}
// If we have no replicated WALs, start from last index in shadow WAL.
if pos.Index == 0 && pos.Offset == 0 {
if pos.Index, err = r.db.CurrentShadowWALIndex(pos.Generation); err != nil {
log.Printf("%s(%s): cannot determine latest shadow wal index: %s", r.db.Path(), r.Name(), err)
continue
}
}
// Synchronize the shadow wal into the replication directory.
if pos, err = r.sync(ctx, pos); err != nil {
if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
continue
}
// Gzip any old WAL files.
if pos.Generation != "" {
if err := r.compress(ctx, pos.Generation); err != nil {
log.Printf("%s(%s): compress error: %s", r.db.Path(), r.Name(), err)
continue
}
}
}
}
// pos returns the position for the replica for the current generation.
// calcPos returns the position for the replica for the current generation.
// Returns a zero value if there is no active generation.
func (r *FileReplica) pos() (pos Pos, err error) {
// Find the current generation from the DB. Return zero pos if no generation.
generation, err := r.db.CurrentGeneration()
if err != nil {
return pos, err
} else if generation == "" {
return pos, nil // empty position
}
func (r *FileReplica) calcPos(generation string) (pos Pos, err error) {
pos.Generation = generation
// Find maximum snapshot index.
if pos.Index, err = r.MaxSnapshotIndex(generation); err != nil {
return Pos{}, err
}
// Find highest WAL subdirectory group.
subdir, err := r.MaxWALSubdirName(generation)
if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at beginning of generation
return pos, nil // no replicated wal, start at snapshot index
} else if err != nil {
return pos, err
return Pos{}, err
}
// Find the max WAL file.
// Find the max WAL file within WAL group.
fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), subdir))
if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at beginning of generation
return pos, nil // no replicated wal, start at snapshot index.
} else if err != nil {
return pos, err
return Pos{}, err
}
index := -1
@@ -384,14 +385,14 @@ func (r *FileReplica) pos() (pos Pos, err error) {
}
}
if index == -1 {
return pos, nil // wal directory exists but no wal files, return beginning pos
return pos, nil // wal directory exists but no wal files, return snapshot position
}
pos.Index = index
// Determine current offset.
fi, err := os.Stat(r.WALPath(pos.Generation, pos.Index))
if err != nil {
return pos, err
return Pos{}, err
}
pos.Offset = fi.Size()
@@ -441,61 +442,95 @@ func (r *FileReplica) snapshotN(generation string) (int, error) {
return n, nil
}
func (r *FileReplica) sync(ctx context.Context, pos Pos) (_ Pos, err error) {
func (r *FileReplica) Sync(ctx context.Context) (err error) {
// Find current position of database.
dpos, err := r.db.Pos()
if err != nil {
return fmt.Errorf("cannot determine current generation: %w", err)
} else if dpos.IsZero() {
return fmt.Errorf("no generation, waiting for data")
}
generation := dpos.Generation
// Create snapshot if no snapshots exist for generation.
if n, err := r.snapshotN(generation); err != nil {
return err
} else if n == 0 {
if err := r.snapshot(ctx, generation, dpos.Index); err != nil {
return err
}
}
// Determine position, if necessary.
if r.Pos().IsZero() {
pos, err := r.calcPos(generation)
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", r.db.Path(), r.Name(), err)
}
r.mu.Lock()
r.pos = pos
r.mu.Unlock()
}
// Read all WAL files since the last position.
for {
if pos, err = r.syncNext(ctx, pos); err == io.EOF {
return pos, nil
if err = r.syncWAL(ctx); err == io.EOF {
break
} else if err != nil {
return pos, err
return err
}
}
// Gzip any old WAL files.
if generation != "" {
if err := r.compress(ctx, generation); err != nil {
return fmt.Errorf("cannot compress: %s", err)
}
}
return nil
}
func (r *FileReplica) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) {
rd, err := r.db.ShadowWALReader(pos)
func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
rd, err := r.db.ShadowWALReader(r.Pos())
if err == io.EOF {
return pos, err
return err
} else if err != nil {
return pos, fmt.Errorf("wal reader: %w", err)
return fmt.Errorf("wal reader: %w", err)
}
defer rd.Close()
// Create snapshot if no snapshots exist.
if n, err := r.snapshotN(rd.Pos().Generation); err != nil {
return pos, err
} else if n == 0 {
if err := r.snapshot(ctx, rd.Pos().Generation, rd.Pos().Index); err != nil {
return pos, err
}
}
// Ensure parent directory exists for WAL file.
filename := r.WALPath(rd.Pos().Generation, rd.Pos().Index)
if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil {
return pos, err
return err
}
// Create a temporary file to write into so we don't have partial writes.
w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return pos, err
return err
}
defer w.Close()
// Seek, copy & sync WAL contents.
if _, err := w.Seek(rd.Pos().Offset, io.SeekStart); err != nil {
return pos, err
return err
} else if _, err := io.Copy(w, rd); err != nil {
return pos, err
return err
} else if err := w.Sync(); err != nil {
return pos, err
return err
} else if err := w.Close(); err != nil {
return pos, err
return err
}
// Return ending position of the reader.
return rd.Pos(), nil
// Save last replicated position.
r.mu.Lock()
r.pos = rd.Pos()
r.mu.Unlock()
return nil
}
// compress gzips all WAL files before the current one.