Compare commits
1 Commits
v0.4.0-alp
...
v0.4.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ff784b34d |
31
db.go
31
db.go
@@ -233,10 +233,6 @@ func (db *DB) invalidate(ctx context.Context) (err error) {
|
|||||||
if err := db.invalidatePos(ctx); err != nil {
|
if err := db.invalidatePos(ctx); err != nil {
|
||||||
return fmt.Errorf("cannot determine pos: %w", err)
|
return fmt.Errorf("cannot determine pos: %w", err)
|
||||||
} else if db.pos.IsZero() {
|
} else if db.pos.IsZero() {
|
||||||
db.Logger.Printf("init: no wal files available, clearing generation")
|
|
||||||
if err := db.clearGeneration(ctx); err != nil {
|
|
||||||
return fmt.Errorf("clear generation: %w", err)
|
|
||||||
}
|
|
||||||
return nil // no position, exit
|
return nil // no position, exit
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -263,7 +259,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
defer itr.Close()
|
defer itr.Close()
|
||||||
|
|
||||||
var pos Pos
|
pos := Pos{Generation: generation}
|
||||||
for itr.Next() {
|
for itr.Next() {
|
||||||
info := itr.WALSegment()
|
info := itr.WALSegment()
|
||||||
pos = info.Pos()
|
pos = info.Pos()
|
||||||
@@ -621,14 +617,14 @@ func (db *DB) init() (err error) {
|
|||||||
|
|
||||||
// Determine current position, if available.
|
// Determine current position, if available.
|
||||||
if err := db.invalidate(db.ctx); err != nil {
|
if err := db.invalidate(db.ctx); err != nil {
|
||||||
return fmt.Errorf("invalidate: %w", err)
|
return fmt.Errorf("cannot determine position: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have an existing shadow WAL, ensure the headers match.
|
// If we have an existing shadow WAL, ensure the headers match.
|
||||||
if err := db.verifyHeadersMatch(); err != nil {
|
if err := db.verifyHeadersMatch(); err != nil {
|
||||||
db.Logger.Printf("init: cannot determine last wal position, clearing generation; %s", err)
|
db.Logger.Printf("init: cannot determine last wal position, clearing generation; %s", err)
|
||||||
if err := db.clearGeneration(db.ctx); err != nil {
|
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("clear generation: %w", err)
|
return fmt.Errorf("remove generation name: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -645,13 +641,6 @@ func (db *DB) init() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) clearGeneration(ctx context.Context) error {
|
|
||||||
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match.
|
// verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match.
|
||||||
func (db *DB) verifyHeadersMatch() error {
|
func (db *DB) verifyHeadersMatch() error {
|
||||||
// Skip verification if we have no current position.
|
// Skip verification if we have no current position.
|
||||||
@@ -719,18 +708,18 @@ func (db *DB) cleanWAL(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Determine lowest index that's been replicated to all replicas.
|
// Determine lowest index that's been replicated to all replicas.
|
||||||
minIndex := -1
|
var min Pos
|
||||||
for _, r := range db.Replicas {
|
for _, r := range db.Replicas {
|
||||||
pos := r.Pos().Truncate()
|
pos := r.Pos().Truncate()
|
||||||
if pos.Generation != generation {
|
if pos.Generation != generation {
|
||||||
continue // different generation, skip
|
continue // different generation, skip
|
||||||
} else if minIndex == -1 || pos.Index < minIndex {
|
} else if min.IsZero() || pos.Index < min.Index {
|
||||||
minIndex = pos.Index
|
min = pos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Skip if our lowest position is too small.
|
// Skip if our lowest position is too small.
|
||||||
if minIndex <= 0 {
|
if min.IsZero() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -745,11 +734,11 @@ func (db *DB) cleanWAL(ctx context.Context) error {
|
|||||||
index, err := ParseIndex(ent.Name())
|
index, err := ParseIndex(ent.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
} else if index >= minIndex {
|
} else if index >= min.Index {
|
||||||
continue // not below min, skip
|
continue // not below min, skip
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.RemoveAll(filepath.Join(dir, FormatIndex(index))); err != nil {
|
if err := os.RemoveAll(filepath.Join(dir)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -515,7 +515,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
|||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Logger.Printf("snapshot written %s/%08x", pos.Generation, pos.Index)
|
log.Printf("snapshot written %s/%08x", pos.Generation, pos.Index)
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user