Compare commits
1 Commits
v0.4.0-alp
...
v0.4.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ff784b34d |
48
db.go
48
db.go
@@ -723,24 +723,26 @@ func (db *DB) cleanWAL(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove all WAL files before minimum.
|
// Delete all WAL index directories below the minimum position.
|
||||||
itr, err := db.WALSegments(ctx, generation)
|
dir := db.ShadowWALDir(generation)
|
||||||
|
ents, err := os.ReadDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("wal segments: %w", err)
|
return err
|
||||||
}
|
}
|
||||||
defer itr.Close()
|
|
||||||
|
|
||||||
var deleted []Pos
|
for _, ent := range ents {
|
||||||
for itr.Next() {
|
index, err := ParseIndex(ent.Name())
|
||||||
info := itr.WALSegment()
|
if err != nil {
|
||||||
if info.Index >= min.Index {
|
|
||||||
continue
|
continue
|
||||||
}
|
} else if index >= min.Index {
|
||||||
deleted = append(deleted, info.Pos())
|
continue // not below min, skip
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.DeleteWALSegments(ctx, deleted); err != nil {
|
if err := os.RemoveAll(filepath.Join(dir)); err != nil {
|
||||||
return fmt.Errorf("delete wal segments: %w", err)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
db.Logger.Printf("remove shadow index: %s/%08x", generation, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -1111,7 +1113,7 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error {
|
|||||||
tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp")
|
tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp")
|
||||||
defer os.Remove(tempFilename)
|
defer os.Remove(tempFilename)
|
||||||
|
|
||||||
f, err := os.Create(tempFilename)
|
f, err := internal.CreateFile(tempFilename, db.fileInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -1266,21 +1268,6 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteWALSegments deletes WAL segments at the given positions.
|
|
||||||
func (db *DB) DeleteWALSegments(ctx context.Context, a []Pos) error {
|
|
||||||
for _, pos := range a {
|
|
||||||
if pos.Generation == "" {
|
|
||||||
return fmt.Errorf("generation required")
|
|
||||||
}
|
|
||||||
filename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.lz4")
|
|
||||||
|
|
||||||
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WALSegments returns an iterator over all available WAL files for a generation.
|
// WALSegments returns an iterator over all available WAL files for a generation.
|
||||||
func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
|
func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
|
||||||
ents, err := os.ReadDir(db.ShadowWALDir(generation))
|
ents, err := os.ReadDir(db.ShadowWALDir(generation))
|
||||||
@@ -1353,12 +1340,15 @@ func (itr *shadowWALSegmentIterator) Next() bool {
|
|||||||
itr.err = err
|
itr.err = err
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer func() { _ = f.Close() }()
|
||||||
|
|
||||||
fis, err := f.Readdir(-1)
|
fis, err := f.Readdir(-1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
itr.err = err
|
itr.err = err
|
||||||
return false
|
return false
|
||||||
|
} else if err := f.Close(); err != nil {
|
||||||
|
itr.err = err
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
filename := filepath.Base(fi.Name())
|
filename := filepath.Base(fi.Name())
|
||||||
|
|||||||
@@ -427,7 +427,11 @@ func (itr *walSegmentIterator) Next() bool {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
itr.err = err
|
itr.err = err
|
||||||
return false
|
return false
|
||||||
|
} else if err := f.Close(); err != nil {
|
||||||
|
itr.err = err
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
filename := filepath.Base(fi.Name())
|
filename := filepath.Base(fi.Name())
|
||||||
if fi.IsDir() {
|
if fi.IsDir() {
|
||||||
|
|||||||
Reference in New Issue
Block a user