Add DB sync tests

This commit is contained in:
Ben Johnson
2021-01-05 13:59:16 -07:00
parent 979cabcdb9
commit f4d055916a
2 changed files with 336 additions and 33 deletions

40
db.go
View File

@@ -219,7 +219,7 @@ func (db *DB) Pos() (Pos, error) {
return Pos{}, err
}
return Pos{Generation: generation, Index: index, Offset: fi.Size()}, nil
return Pos{Generation: generation, Index: index, Offset: frameAlign(fi.Size(), db.pageSize)}, nil
}
// Notify returns a channel that closes when the shadow WAL changes.
@@ -764,16 +764,16 @@ func (db *DB) verify() (info syncInfo, err error) {
// Open shadow WAL to copy append to.
info.shadowWALPath, err = db.CurrentShadowWALPath(info.generation)
if info.shadowWALPath == "" {
info.reason = "no shadow wal"
return info, nil
} else if err != nil {
if err != nil {
return info, fmt.Errorf("cannot determine shadow WAL: %w", err)
}
// Determine shadow WAL current size.
fi, err = os.Stat(info.shadowWALPath)
if err != nil {
if os.IsNotExist(err) {
info.reason = "no shadow wal"
return info, nil
} else if err != nil {
return info, err
}
info.shadowWALSize = frameAlign(fi.Size(), db.pageSize)
@@ -781,7 +781,8 @@ func (db *DB) verify() (info syncInfo, err error) {
// Truncate shadow WAL if there is a partial page.
// Exit if shadow WAL does not contain a full header.
if info.shadowWALSize < WALHeaderSize {
return info, fmt.Errorf("short shadow wal: %s", info.shadowWALPath)
info.reason = "short shadow wal"
return info, nil
}
// If shadow WAL is larger than real WAL then the WAL has been truncated
@@ -907,6 +908,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
if err != nil {
return 0, err
}
origSize := frameAlign(fi.Size(), db.pageSize)
// Read shadow WAL header to determine byte order for checksum & salt.
hdr := make([]byte, WALHeaderSize)
@@ -928,16 +930,13 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
}
// Seek to correct position on both files.
if _, err := r.Seek(fi.Size(), io.SeekStart); err != nil {
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("wal seek: %w", err)
} else if _, err := w.Seek(fi.Size(), io.SeekStart); err != nil {
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("shadow wal seek: %w", err)
}
// TODO: Optimize to use bufio on reader & writer to minimize syscalls.
// Loop over each page, verify checksum, & copy to writer.
origSize := fi.Size()
newSize = origSize
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
for {
@@ -1103,8 +1102,8 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
offset := int64(WALHeaderChecksumOffset)
if fi, err := f.Stat(); err != nil {
return 0, 0, err
} else if fi.Size() > WALHeaderSize {
offset = fi.Size() - int64(pageSize) - WALFrameHeaderSize + WALFrameHeaderChecksumOffset
} else if sz := frameAlign(fi.Size(), pageSize); fi.Size() > WALHeaderSize {
offset = sz - int64(pageSize) - WALFrameHeaderSize + WALFrameHeaderChecksumOffset
}
// Read big endian checksum.
@@ -1117,8 +1116,19 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
return binary.BigEndian.Uint32(b[0:]), binary.BigEndian.Uint32(b[4:]), nil
}
// checkpoint performs a checkpoint on the WAL file.
// Checkpoint performs a checkpoint on the WAL file.
func (db *DB) Checkpoint(mode string) (err error) {
db.mu.Lock()
defer db.mu.Unlock()
return db.checkpoint(mode)
}
func (db *DB) checkpoint(mode string) (err error) {
// Ignore if there is no underlying database.
if db.db == nil {
return nil
}
// Track checkpoint metrics.
t := time.Now()
defer func() {