Fix rollover issue under load
This commit is contained in:
46
db.go
46
db.go
@@ -441,7 +441,7 @@ func (db *DB) Sync() (err error) {
|
|||||||
|
|
||||||
// Issue the checkpoint.
|
// Issue the checkpoint.
|
||||||
if checkpoint {
|
if checkpoint {
|
||||||
if err := db.checkpoint(forceCheckpoint); err != nil {
|
if err := db.checkpoint(info, forceCheckpoint); err != nil {
|
||||||
return fmt.Errorf("checkpoint: force=%v err=%w", err)
|
return fmt.Errorf("checkpoint: force=%v err=%w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -514,20 +514,13 @@ func (db *DB) verify() (info syncInfo, err error) {
|
|||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compare WAL headers. Ensure that checkpoint sequences match and the
|
// Compare WAL headers. Start a new shadow WAL if they are mismatched.
|
||||||
// header is the same or that we have moved to the next sequence. If we have
|
|
||||||
// out-of-order or skipped checkpoint sequences then we should start a new
|
|
||||||
// generation.
|
|
||||||
if hdr0, err := readWALHeader(db.WALPath()); err != nil {
|
if hdr0, err := readWALHeader(db.WALPath()); err != nil {
|
||||||
return info, fmt.Errorf("cannot read wal header: %w", err)
|
return info, fmt.Errorf("cannot read wal header: %w", err)
|
||||||
} else if hdr1, err := readWALHeader(info.shadowWALPath); err != nil {
|
} else if hdr1, err := readWALHeader(info.shadowWALPath); err != nil {
|
||||||
return info, fmt.Errorf("cannot read shadow wal header: %w", err)
|
return info, fmt.Errorf("cannot read shadow wal header: %w", err)
|
||||||
} else if seq0, seq1 := readCheckpointSeqNo(hdr0), readCheckpointSeqNo(hdr1); seq0 == seq1 && !bytes.Equal(hdr0, hdr1) {
|
} else if !bytes.Equal(hdr0, hdr1) {
|
||||||
return info, fmt.Errorf("wal header mismatch: chkpt=%d", seq0)
|
|
||||||
} else if seq0+1 == seq1 {
|
|
||||||
info.restart = !bytes.Equal(hdr0, hdr1)
|
info.restart = !bytes.Equal(hdr0, hdr1)
|
||||||
} else if seq0 != seq1 {
|
|
||||||
return info, fmt.Errorf("non-contiguous checkpoint sequence: real=%d shadow=%d", seq0, seq1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Handle checkpoint sequence number rollover.
|
// TODO: Handle checkpoint sequence number rollover.
|
||||||
@@ -730,7 +723,13 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// checkpoint performs a checkpoint on the WAL file.
|
// checkpoint performs a checkpoint on the WAL file.
|
||||||
func (db *DB) checkpoint(force bool) error {
|
func (db *DB) checkpoint(info syncInfo, force bool) error {
|
||||||
|
// Read WAL header before checkpoint to check if it has been restarted.
|
||||||
|
hdr, err := readWALHeader(db.WALPath())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure the read lock has been removed before issuing a checkpoint.
|
// Ensure the read lock has been removed before issuing a checkpoint.
|
||||||
// We defer the re-acquire to ensure it occurs even on an early return.
|
// We defer the re-acquire to ensure it occurs even on an early return.
|
||||||
if err := db.releaseReadLock(); err != nil {
|
if err := db.releaseReadLock(); err != nil {
|
||||||
@@ -759,6 +758,31 @@ func (db *DB) checkpoint(force bool) error {
|
|||||||
if err := db.acquireReadLock(); err != nil {
|
if err := db.acquireReadLock(); err != nil {
|
||||||
return fmt.Errorf("release read lock: %w", err)
|
return fmt.Errorf("release read lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If WAL hasn't been restarted, exit.
|
||||||
|
if other, err := readWALHeader(db.WALPath()); err != nil {
|
||||||
|
return err
|
||||||
|
} else if bytes.Equal(hdr, other) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse index of current shadow WAL file.
|
||||||
|
dir, base := filepath.Split(info.shadowWALPath)
|
||||||
|
index, err := ParseWALFilename(base)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a new shadow WAL file with next index.
|
||||||
|
newShadowWALPath := filepath.Join(dir, FormatWALFilename(index+1))
|
||||||
|
if err := db.initShadowWALFile(newShadowWALPath); err != nil {
|
||||||
|
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ func ParseWALFilename(name string) (index int, err error) {
|
|||||||
|
|
||||||
func FormatWALFilename(index int) string {
|
func FormatWALFilename(index int) string {
|
||||||
assert(index >= 0, "wal index must be non-negative")
|
assert(index >= 0, "wal index must be non-negative")
|
||||||
return fmt.Sprintf("%016d%s", index, WALExt)
|
return fmt.Sprintf("%016x%s", index, WALExt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HexDump returns hexdump output but with duplicate lines removed.
|
// HexDump returns hexdump output but with duplicate lines removed.
|
||||||
|
|||||||
Reference in New Issue
Block a user