|
|
|
@@ -233,6 +233,10 @@ func (db *DB) invalidate(ctx context.Context) (err error) {
|
|
|
|
if err := db.invalidatePos(ctx); err != nil {
|
|
|
|
if err := db.invalidatePos(ctx); err != nil {
|
|
|
|
return fmt.Errorf("cannot determine pos: %w", err)
|
|
|
|
return fmt.Errorf("cannot determine pos: %w", err)
|
|
|
|
} else if db.pos.IsZero() {
|
|
|
|
} else if db.pos.IsZero() {
|
|
|
|
|
|
|
|
db.Logger.Printf("init: no wal files available, clearing generation")
|
|
|
|
|
|
|
|
if err := db.clearGeneration(ctx); err != nil {
|
|
|
|
|
|
|
|
return fmt.Errorf("clear generation: %w", err)
|
|
|
|
|
|
|
|
}
|
|
|
|
return nil // no position, exit
|
|
|
|
return nil // no position, exit
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -259,7 +263,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
defer itr.Close()
|
|
|
|
defer itr.Close()
|
|
|
|
|
|
|
|
|
|
|
|
pos := Pos{Generation: generation}
|
|
|
|
var pos Pos
|
|
|
|
for itr.Next() {
|
|
|
|
for itr.Next() {
|
|
|
|
info := itr.WALSegment()
|
|
|
|
info := itr.WALSegment()
|
|
|
|
pos = info.Pos()
|
|
|
|
pos = info.Pos()
|
|
|
|
@@ -617,14 +621,14 @@ func (db *DB) init() (err error) {
|
|
|
|
|
|
|
|
|
|
|
|
// Determine current position, if available.
|
|
|
|
// Determine current position, if available.
|
|
|
|
if err := db.invalidate(db.ctx); err != nil {
|
|
|
|
if err := db.invalidate(db.ctx); err != nil {
|
|
|
|
return fmt.Errorf("cannot determine position: %w", err)
|
|
|
|
return fmt.Errorf("invalidate: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// If we have an existing shadow WAL, ensure the headers match.
|
|
|
|
// If we have an existing shadow WAL, ensure the headers match.
|
|
|
|
if err := db.verifyHeadersMatch(); err != nil {
|
|
|
|
if err := db.verifyHeadersMatch(); err != nil {
|
|
|
|
db.Logger.Printf("init: cannot determine last wal position, clearing generation; %s", err)
|
|
|
|
db.Logger.Printf("init: cannot determine last wal position, clearing generation; %s", err)
|
|
|
|
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
|
|
|
if err := db.clearGeneration(db.ctx); err != nil {
|
|
|
|
return fmt.Errorf("remove generation name: %w", err)
|
|
|
|
return fmt.Errorf("clear generation: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -641,6 +645,13 @@ func (db *DB) init() (err error) {
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (db *DB) clearGeneration(ctx context.Context) error {
|
|
|
|
|
|
|
|
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match.
|
|
|
|
// verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match.
|
|
|
|
func (db *DB) verifyHeadersMatch() error {
|
|
|
|
func (db *DB) verifyHeadersMatch() error {
|
|
|
|
// Skip verification if we have no current position.
|
|
|
|
// Skip verification if we have no current position.
|
|
|
|
@@ -708,39 +719,41 @@ func (db *DB) cleanWAL(ctx context.Context) error {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Determine lowest index that's been replicated to all replicas.
|
|
|
|
// Determine lowest index that's been replicated to all replicas.
|
|
|
|
var min Pos
|
|
|
|
minIndex := -1
|
|
|
|
for _, r := range db.Replicas {
|
|
|
|
for _, r := range db.Replicas {
|
|
|
|
pos := r.Pos().Truncate()
|
|
|
|
pos := r.Pos().Truncate()
|
|
|
|
if pos.Generation != generation {
|
|
|
|
if pos.Generation != generation {
|
|
|
|
continue // different generation, skip
|
|
|
|
continue // different generation, skip
|
|
|
|
} else if min.IsZero() || pos.Index < min.Index {
|
|
|
|
} else if minIndex == -1 || pos.Index < minIndex {
|
|
|
|
min = pos
|
|
|
|
minIndex = pos.Index
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Skip if our lowest position is too small.
|
|
|
|
// Skip if our lowest position is too small.
|
|
|
|
if min.IsZero() {
|
|
|
|
if minIndex <= 0 {
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Remove all WAL files before minimum.
|
|
|
|
// Delete all WAL index directories below the minimum position.
|
|
|
|
itr, err := db.WALSegments(ctx, generation)
|
|
|
|
dir := db.ShadowWALDir(generation)
|
|
|
|
|
|
|
|
ents, err := os.ReadDir(dir)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("wal segments: %w", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
defer itr.Close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var deleted []Pos
|
|
|
|
for _, ent := range ents {
|
|
|
|
for itr.Next() {
|
|
|
|
index, err := ParseIndex(ent.Name())
|
|
|
|
info := itr.WALSegment()
|
|
|
|
if err != nil {
|
|
|
|
if info.Index >= min.Index {
|
|
|
|
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
} else if index >= minIndex {
|
|
|
|
deleted = append(deleted, info.Pos())
|
|
|
|
continue // not below min, skip
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := db.DeleteWALSegments(ctx, deleted); err != nil {
|
|
|
|
if err := os.RemoveAll(filepath.Join(dir, FormatIndex(index))); err != nil {
|
|
|
|
return fmt.Errorf("delete wal segments: %w", err)
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
db.Logger.Printf("remove shadow index: %s/%08x", generation, index)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
@@ -1111,7 +1124,7 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error {
|
|
|
|
tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp")
|
|
|
|
tempFilename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.tmp")
|
|
|
|
defer os.Remove(tempFilename)
|
|
|
|
defer os.Remove(tempFilename)
|
|
|
|
|
|
|
|
|
|
|
|
f, err := os.Create(tempFilename)
|
|
|
|
f, err := internal.CreateFile(tempFilename, db.fileInfo)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@@ -1266,21 +1279,6 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// DeleteWALSegments deletes WAL segments at the given positions.
|
|
|
|
|
|
|
|
func (db *DB) DeleteWALSegments(ctx context.Context, a []Pos) error {
|
|
|
|
|
|
|
|
for _, pos := range a {
|
|
|
|
|
|
|
|
if pos.Generation == "" {
|
|
|
|
|
|
|
|
return fmt.Errorf("generation required")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
filename := filepath.Join(db.ShadowWALDir(pos.Generation), FormatIndex(pos.Index), FormatOffset(pos.Offset)+".wal.lz4")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// WALSegments returns an iterator over all available WAL files for a generation.
|
|
|
|
// WALSegments returns an iterator over all available WAL files for a generation.
|
|
|
|
func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
|
|
|
|
func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
|
|
|
|
ents, err := os.ReadDir(db.ShadowWALDir(generation))
|
|
|
|
ents, err := os.ReadDir(db.ShadowWALDir(generation))
|
|
|
|
@@ -1353,12 +1351,15 @@ func (itr *shadowWALSegmentIterator) Next() bool {
|
|
|
|
itr.err = err
|
|
|
|
itr.err = err
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
defer func() { _ = f.Close() }()
|
|
|
|
|
|
|
|
|
|
|
|
fis, err := f.Readdir(-1)
|
|
|
|
fis, err := f.Readdir(-1)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
itr.err = err
|
|
|
|
itr.err = err
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
|
|
|
|
} else if err := f.Close(); err != nil {
|
|
|
|
|
|
|
|
itr.err = err
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, fi := range fis {
|
|
|
|
for _, fi := range fis {
|
|
|
|
filename := filepath.Base(fi.Name())
|
|
|
|
filename := filepath.Base(fi.Name())
|
|
|
|
|