Compare commits

..

1 Commits

Author SHA1 Message Date
Ben Johnson
8ff784b34d Refactor shadow WAL to use segments 2021-07-18 08:45:49 -06:00
2 changed files with 23 additions and 29 deletions

48
db.go
View File

@@ -723,24 +723,26 @@ func (db *DB) cleanWAL(ctx context.Context) error {
return nil return nil
} }
// Remove all WAL files before minimum. // Delete all WAL index directories below the minimum position.
itr, err := db.WALSegments(ctx, generation) dir := db.ShadowWALDir(generation)
ents, err := os.ReadDir(dir)
if err != nil { if err != nil {
return fmt.Errorf("wal segments: %w", err) return err
} }
defer itr.Close()
var deleted []Pos for _, ent := range ents {
for itr.Next() { index, err := ParseIndex(ent.Name())
info := itr.WALSegment() if err != nil {
if info.Index >= min.Index {
continue continue
} else if index >= min.Index {
continue // not below min, skip
} }
deleted = append(deleted, info.Pos())
}
if err := db.DeleteWALSegments(ctx, deleted); err != nil { if err := os.RemoveAll(filepath.Join(dir)); err != nil {
return fmt.Errorf("delete wal segments: %w", err) return err
}
db.Logger.Printf("remove shadow index: %s/%08x", generation, index)
} }
return nil return nil
@@ -1111,7 +1113,7 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error {
tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp") tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp")
defer os.Remove(tempFilename) defer os.Remove(tempFilename)
f, err := os.Create(tempFilename) f, err := internal.CreateFile(tempFilename, db.fileInfo)
if err != nil { if err != nil {
return err return err
} }
@@ -1266,21 +1268,6 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
return nil return nil
} }
// DeleteWALSegments deletes WAL segments at the given positions.
func (db *DB) DeleteWALSegments(ctx context.Context, a []Pos) error {
for _, pos := range a {
if pos.Generation == "" {
return fmt.Errorf("generation required")
}
filename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.lz4")
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
return err
}
}
return nil
}
// WALSegments returns an iterator over all available WAL files for a generation. // WALSegments returns an iterator over all available WAL files for a generation.
func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) { func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
ents, err := os.ReadDir(db.ShadowWALDir(generation)) ents, err := os.ReadDir(db.ShadowWALDir(generation))
@@ -1353,12 +1340,15 @@ func (itr *shadowWALSegmentIterator) Next() bool {
itr.err = err itr.err = err
return false return false
} }
defer f.Close() defer func() { _ = f.Close() }()
fis, err := f.Readdir(-1) fis, err := f.Readdir(-1)
if err != nil { if err != nil {
itr.err = err itr.err = err
return false return false
} else if err := f.Close(); err != nil {
itr.err = err
return false
} }
for _, fi := range fis { for _, fi := range fis {
filename := filepath.Base(fi.Name()) filename := filepath.Base(fi.Name())

View File

@@ -427,7 +427,11 @@ func (itr *walSegmentIterator) Next() bool {
if err != nil { if err != nil {
itr.err = err itr.err = err
return false return false
} else if err := f.Close(); err != nil {
itr.err = err
return false
} }
for _, fi := range fis { for _, fi := range fis {
filename := filepath.Base(fi.Name()) filename := filepath.Base(fi.Name())
if fi.IsDir() { if fi.IsDir() {