Compare commits

..

1 Commits

Author SHA1 Message Date
Ben Johnson
6eb290720b Refactor shadow WAL to use segments 2021-07-16 15:56:31 -06:00
3 changed files with 41 additions and 46 deletions

81
db.go
View File

@@ -233,10 +233,6 @@ func (db *DB) invalidate(ctx context.Context) (err error) {
if err := db.invalidatePos(ctx); err != nil { if err := db.invalidatePos(ctx); err != nil {
return fmt.Errorf("cannot determine pos: %w", err) return fmt.Errorf("cannot determine pos: %w", err)
} else if db.pos.IsZero() { } else if db.pos.IsZero() {
db.Logger.Printf("init: no wal files available, clearing generation")
if err := db.clearGeneration(ctx); err != nil {
return fmt.Errorf("clear generation: %w", err)
}
return nil // no position, exit return nil // no position, exit
} }
@@ -263,7 +259,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
} }
defer itr.Close() defer itr.Close()
var pos Pos pos := Pos{Generation: generation}
for itr.Next() { for itr.Next() {
info := itr.WALSegment() info := itr.WALSegment()
pos = info.Pos() pos = info.Pos()
@@ -621,14 +617,14 @@ func (db *DB) init() (err error) {
// Determine current position, if available. // Determine current position, if available.
if err := db.invalidate(db.ctx); err != nil { if err := db.invalidate(db.ctx); err != nil {
return fmt.Errorf("invalidate: %w", err) return fmt.Errorf("cannot determine position: %w", err)
} }
// If we have an existing shadow WAL, ensure the headers match. // If we have an existing shadow WAL, ensure the headers match.
if err := db.verifyHeadersMatch(); err != nil { if err := db.verifyHeadersMatch(); err != nil {
db.Logger.Printf("init: cannot determine last wal position, clearing generation; %s", err) db.Logger.Printf("init: cannot determine last wal position, clearing generation; %s", err)
if err := db.clearGeneration(db.ctx); err != nil { if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("clear generation: %w", err) return fmt.Errorf("remove generation name: %w", err)
} }
} }
@@ -645,13 +641,6 @@ func (db *DB) init() (err error) {
return nil return nil
} }
func (db *DB) clearGeneration(ctx context.Context) error {
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match. // verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match.
func (db *DB) verifyHeadersMatch() error { func (db *DB) verifyHeadersMatch() error {
// Skip verification if we have no current position. // Skip verification if we have no current position.
@@ -719,41 +708,39 @@ func (db *DB) cleanWAL(ctx context.Context) error {
} }
// Determine lowest index that's been replicated to all replicas. // Determine lowest index that's been replicated to all replicas.
minIndex := -1 var min Pos
for _, r := range db.Replicas { for _, r := range db.Replicas {
pos := r.Pos().Truncate() pos := r.Pos().Truncate()
if pos.Generation != generation { if pos.Generation != generation {
continue // different generation, skip continue // different generation, skip
} else if minIndex == -1 || pos.Index < minIndex { } else if min.IsZero() || pos.Index < min.Index {
minIndex = pos.Index min = pos
} }
} }
// Skip if our lowest position is too small. // Skip if our lowest position is too small.
if minIndex <= 0 { if min.IsZero() {
return nil return nil
} }
// Delete all WAL index directories below the minimum position. // Remove all WAL files before minimum.
dir := db.ShadowWALDir(generation) itr, err := db.WALSegments(ctx, generation)
ents, err := os.ReadDir(dir)
if err != nil { if err != nil {
return err return fmt.Errorf("wal segments: %w", err)
}
defer itr.Close()
var deleted []Pos
for itr.Next() {
info := itr.WALSegment()
if info.Index >= min.Index {
continue
}
deleted = append(deleted, info.Pos())
} }
for _, ent := range ents { if err := db.DeleteWALSegments(ctx, deleted); err != nil {
index, err := ParseIndex(ent.Name()) return fmt.Errorf("delete wal segments: %w", err)
if err != nil {
continue
} else if index >= minIndex {
continue // not below min, skip
}
if err := os.RemoveAll(filepath.Join(dir, FormatIndex(index))); err != nil {
return err
}
db.Logger.Printf("remove shadow index: %s/%08x", generation, index)
} }
return nil return nil
@@ -1124,7 +1111,7 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error {
tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp") tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp")
defer os.Remove(tempFilename) defer os.Remove(tempFilename)
f, err := internal.CreateFile(tempFilename, db.fileInfo) f, err := os.Create(tempFilename)
if err != nil { if err != nil {
return err return err
} }
@@ -1279,6 +1266,21 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
return nil return nil
} }
// DeleteWALSegments deletes WAL segments at the given positions.
func (db *DB) DeleteWALSegments(ctx context.Context, a []Pos) error {
for _, pos := range a {
if pos.Generation == "" {
return fmt.Errorf("generation required")
}
filename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.lz4")
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
return err
}
}
return nil
}
// WALSegments returns an iterator over all available WAL files for a generation. // WALSegments returns an iterator over all available WAL files for a generation.
func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) { func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
ents, err := os.ReadDir(db.ShadowWALDir(generation)) ents, err := os.ReadDir(db.ShadowWALDir(generation))
@@ -1351,15 +1353,12 @@ func (itr *shadowWALSegmentIterator) Next() bool {
itr.err = err itr.err = err
return false return false
} }
defer func() { _ = f.Close() }() defer f.Close()
fis, err := f.Readdir(-1) fis, err := f.Readdir(-1)
if err != nil { if err != nil {
itr.err = err itr.err = err
return false return false
} else if err := f.Close(); err != nil {
itr.err = err
return false
} }
for _, fi := range fis { for _, fi := range fis {
filename := filepath.Base(fi.Name()) filename := filepath.Base(fi.Name())

View File

@@ -427,11 +427,7 @@ func (itr *walSegmentIterator) Next() bool {
if err != nil { if err != nil {
itr.err = err itr.err = err
return false return false
} else if err := f.Close(); err != nil {
itr.err = err
return false
} }
for _, fi := range fis { for _, fi := range fis {
filename := filepath.Base(fi.Name()) filename := filepath.Base(fi.Name())
if fi.IsDir() { if fi.IsDir() {

View File

@@ -515,7 +515,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return info, err return info, err
} }
r.Logger.Printf("snapshot written %s/%08x", pos.Generation, pos.Index) log.Printf("snapshot written %s/%08x", pos.Generation, pos.Index)
return info, nil return info, nil
} }