Compare commits

..

1 Commits

Author SHA1 Message Date
Ben Johnson
99fe882376 Refactor shadow WAL to use segments 2021-07-22 16:03:29 -06:00
2 changed files with 22 additions and 11 deletions

31
db.go
View File

@@ -233,6 +233,10 @@ func (db *DB) invalidate(ctx context.Context) (err error) {
if err := db.invalidatePos(ctx); err != nil {
return fmt.Errorf("cannot determine pos: %w", err)
} else if db.pos.IsZero() {
db.Logger.Printf("init: no wal files available, clearing generation")
if err := db.clearGeneration(ctx); err != nil {
return fmt.Errorf("clear generation: %w", err)
}
return nil // no position, exit
}
@@ -259,7 +263,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
}
defer itr.Close()
pos := Pos{Generation: generation}
var pos Pos
for itr.Next() {
info := itr.WALSegment()
pos = info.Pos()
@@ -617,14 +621,14 @@ func (db *DB) init() (err error) {
// Determine current position, if available.
if err := db.invalidate(db.ctx); err != nil {
return fmt.Errorf("cannot determine position: %w", err)
return fmt.Errorf("invalidate: %w", err)
}
// If we have an existing shadow WAL, ensure the headers match.
if err := db.verifyHeadersMatch(); err != nil {
db.Logger.Printf("init: cannot determine last wal position, clearing generation; %s", err)
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove generation name: %w", err)
if err := db.clearGeneration(db.ctx); err != nil {
return fmt.Errorf("clear generation: %w", err)
}
}
@@ -641,6 +645,13 @@ func (db *DB) init() (err error) {
return nil
}
func (db *DB) clearGeneration(ctx context.Context) error {
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match.
func (db *DB) verifyHeadersMatch() error {
// Skip verification if we have no current position.
@@ -708,18 +719,18 @@ func (db *DB) cleanWAL(ctx context.Context) error {
}
// Determine lowest index that's been replicated to all replicas.
var min Pos
minIndex := -1
for _, r := range db.Replicas {
pos := r.Pos().Truncate()
if pos.Generation != generation {
continue // different generation, skip
} else if min.IsZero() || pos.Index < min.Index {
min = pos
} else if minIndex == -1 || pos.Index < minIndex {
minIndex = pos.Index
}
}
// Skip if our lowest position is too small.
if min.IsZero() {
if minIndex <= 0 {
return nil
}
@@ -734,11 +745,11 @@ func (db *DB) cleanWAL(ctx context.Context) error {
index, err := ParseIndex(ent.Name())
if err != nil {
continue
} else if index >= min.Index {
} else if index >= minIndex {
continue // not below min, skip
}
if err := os.RemoveAll(filepath.Join(dir)); err != nil {
if err := os.RemoveAll(filepath.Join(dir, FormatIndex(index))); err != nil {
return err
}

View File

@@ -515,7 +515,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return info, err
}
log.Printf("snapshot written %s/%08x", pos.Generation, pos.Index)
r.Logger.Printf("snapshot written %s/%08x", pos.Generation, pos.Index)
return info, nil
}