Add checkpoint seq no verification.
This commit is contained in:
29
db.go
29
db.go
@@ -402,7 +402,7 @@ func (db *DB) Sync() (err error) {
|
|||||||
// position of the real WAL hasn't been overwritten by another process.
|
// position of the real WAL hasn't been overwritten by another process.
|
||||||
//
|
//
|
||||||
// If we are unable to verify the WAL state then we start a new generation.
|
// If we are unable to verify the WAL state then we start a new generation.
|
||||||
info, err := db.verifyWAL()
|
info, err := db.verify()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot verify wal state: %w", err)
|
return fmt.Errorf("cannot verify wal state: %w", err)
|
||||||
} else if info.reason != "" {
|
} else if info.reason != "" {
|
||||||
@@ -461,10 +461,10 @@ func (db *DB) ensureWALExists() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// verifyWAL ensures the current shadow WAL state matches where it left off from
|
// verify ensures the current shadow WAL state matches where it left off from
|
||||||
// the real WAL. Returns generation & WAL sync information. If info.reason is
|
// the real WAL. Returns generation & WAL sync information. If info.reason is
|
||||||
// not blank, verification failed and a new generation should be started.
|
// not blank, verification failed and a new generation should be started.
|
||||||
func (db *DB) verifyWAL() (info syncInfo, err error) {
|
func (db *DB) verify() (info syncInfo, err error) {
|
||||||
// Look up existing generation.
|
// Look up existing generation.
|
||||||
generation, err := db.CurrentGeneration()
|
generation, err := db.CurrentGeneration()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -498,7 +498,14 @@ func (db *DB) verifyWAL() (info syncInfo, err error) {
|
|||||||
}
|
}
|
||||||
info.shadowWALSize = fi.Size()
|
info.shadowWALSize = fi.Size()
|
||||||
|
|
||||||
// TODO: Truncate shadow WAL if there is a partial page.
|
// Truncate shadow WAL if there is a partial page.
|
||||||
|
// Exit if shadow WAL does not contain a full header.
|
||||||
|
frameSize := int64(db.pageSize) + WALFrameHeaderSize
|
||||||
|
if info.shadowWALSize < WALHeaderSize {
|
||||||
|
return info, fmt.Errorf("short shadow wal: %s", info.shadowWALPath)
|
||||||
|
} else if (info.shadowWALSize-WALHeaderSize)%frameSize != 0 {
|
||||||
|
info.shadowWALSize = ((info.shadowWALSize - WALHeaderSize) / frameSize) + WALHeaderSize
|
||||||
|
}
|
||||||
|
|
||||||
// If shadow WAL is larger than real WAL then the WAL has been truncated
|
// If shadow WAL is larger than real WAL then the WAL has been truncated
|
||||||
// so we cannot determine our last state.
|
// so we cannot determine our last state.
|
||||||
@@ -507,16 +514,24 @@ func (db *DB) verifyWAL() (info syncInfo, err error) {
|
|||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compare WAL headers. If mismatched then the real WAL has been restarted.
|
// Compare WAL headers. Ensure that checkpoint sequences match and the
|
||||||
// We'll need to start a new shadow WAL during the sync.
|
// header is the same or that we have moved to the next sequence. If we have
|
||||||
|
// out-of-order or skipped checkpoint sequences then we should start a new
|
||||||
|
// generation.
|
||||||
if hdr0, err := readWALHeader(db.WALPath()); err != nil {
|
if hdr0, err := readWALHeader(db.WALPath()); err != nil {
|
||||||
return info, fmt.Errorf("cannot read wal header: %w", err)
|
return info, fmt.Errorf("cannot read wal header: %w", err)
|
||||||
} else if hdr1, err := readWALHeader(info.shadowWALPath); err != nil {
|
} else if hdr1, err := readWALHeader(info.shadowWALPath); err != nil {
|
||||||
return info, fmt.Errorf("cannot read shadow wal header: %w", err)
|
return info, fmt.Errorf("cannot read shadow wal header: %w", err)
|
||||||
} else {
|
} else if seq0, seq1 := readCheckpointSeqNo(hdr0), readCheckpointSeqNo(hdr1); seq0 == seq1 && !bytes.Equal(hdr0, hdr1) {
|
||||||
|
return info, fmt.Errorf("wal header mismatch: chkpt=%d", seq0)
|
||||||
|
} else if seq0+1 == seq1 {
|
||||||
info.restart = !bytes.Equal(hdr0, hdr1)
|
info.restart = !bytes.Equal(hdr0, hdr1)
|
||||||
|
} else if seq0 != seq1 {
|
||||||
|
return info, fmt.Errorf("non-contiguous checkpoint sequence: real=%d shadow=%d", seq0, seq1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Handle checkpoint sequence number rollover.
|
||||||
|
|
||||||
// Verify last page synced still matches.
|
// Verify last page synced still matches.
|
||||||
if info.shadowWALSize > WALHeaderSize {
|
if info.shadowWALSize > WALHeaderSize {
|
||||||
offset := info.shadowWALSize - int64(db.pageSize+WALFrameHeaderSize)
|
offset := info.shadowWALSize - int64(db.pageSize+WALFrameHeaderSize)
|
||||||
|
|||||||
@@ -68,6 +68,10 @@ func readWALHeader(filename string) ([]byte, error) {
|
|||||||
return buf[:n], err
|
return buf[:n], err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func readCheckpointSeqNo(hdr []byte) uint32 {
|
||||||
|
return binary.BigEndian.Uint32(hdr[12:])
|
||||||
|
}
|
||||||
|
|
||||||
// readFileAt reads a slice from a file.
|
// readFileAt reads a slice from a file.
|
||||||
func readFileAt(filename string, offset, n int64) ([]byte, error) {
|
func readFileAt(filename string, offset, n int64) ([]byte, error) {
|
||||||
f, err := os.Open(filename)
|
f, err := os.Open(filename)
|
||||||
|
|||||||
Reference in New Issue
Block a user