Compare commits
1 Commits
v0.4.0-alp
...
v0.4.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ff784b34d |
48
db.go
48
db.go
@@ -723,24 +723,26 @@ func (db *DB) cleanWAL(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove all WAL files before minimum.
|
||||
itr, err := db.WALSegments(ctx, generation)
|
||||
// Delete all WAL index directories below the minimum position.
|
||||
dir := db.ShadowWALDir(generation)
|
||||
ents, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("wal segments: %w", err)
|
||||
return err
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
var deleted []Pos
|
||||
for itr.Next() {
|
||||
info := itr.WALSegment()
|
||||
if info.Index >= min.Index {
|
||||
for _, ent := range ents {
|
||||
index, err := ParseIndex(ent.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
} else if index >= min.Index {
|
||||
continue // not below min, skip
|
||||
}
|
||||
deleted = append(deleted, info.Pos())
|
||||
}
|
||||
|
||||
if err := db.DeleteWALSegments(ctx, deleted); err != nil {
|
||||
return fmt.Errorf("delete wal segments: %w", err)
|
||||
if err := os.RemoveAll(filepath.Join(dir)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.Logger.Printf("remove shadow index: %s/%08x", generation, index)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1111,7 +1113,7 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error {
|
||||
tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp")
|
||||
defer os.Remove(tempFilename)
|
||||
|
||||
f, err := os.Create(tempFilename)
|
||||
f, err := internal.CreateFile(tempFilename, db.fileInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1266,21 +1268,6 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteWALSegments deletes WAL segments at the given positions.
|
||||
func (db *DB) DeleteWALSegments(ctx context.Context, a []Pos) error {
|
||||
for _, pos := range a {
|
||||
if pos.Generation == "" {
|
||||
return fmt.Errorf("generation required")
|
||||
}
|
||||
filename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.lz4")
|
||||
|
||||
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WALSegments returns an iterator over all available WAL files for a generation.
|
||||
func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
|
||||
ents, err := os.ReadDir(db.ShadowWALDir(generation))
|
||||
@@ -1353,12 +1340,15 @@ func (itr *shadowWALSegmentIterator) Next() bool {
|
||||
itr.err = err
|
||||
return false
|
||||
}
|
||||
defer f.Close()
|
||||
defer func() { _ = f.Close() }()
|
||||
|
||||
fis, err := f.Readdir(-1)
|
||||
if err != nil {
|
||||
itr.err = err
|
||||
return false
|
||||
} else if err := f.Close(); err != nil {
|
||||
itr.err = err
|
||||
return false
|
||||
}
|
||||
for _, fi := range fis {
|
||||
filename := filepath.Base(fi.Name())
|
||||
|
||||
@@ -427,7 +427,11 @@ func (itr *walSegmentIterator) Next() bool {
|
||||
if err != nil {
|
||||
itr.err = err
|
||||
return false
|
||||
} else if err := f.Close(); err != nil {
|
||||
itr.err = err
|
||||
return false
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
filename := filepath.Base(fi.Name())
|
||||
if fi.IsDir() {
|
||||
|
||||
Reference in New Issue
Block a user