Only copy committed WAL pages
This commit is contained in:
49
db.go
49
db.go
@@ -976,22 +976,22 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|||||||
// Seek to correct position on both files.
|
// Seek to correct position on both files.
|
||||||
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||||
return 0, fmt.Errorf("wal seek: %w", err)
|
return 0, fmt.Errorf("wal seek: %w", err)
|
||||||
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
|
||||||
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Loop over each page, verify checksum, & copy to writer.
|
// Read through WAL from last position to find the page of the last
|
||||||
newSize = origSize
|
// committed transaction.
|
||||||
|
tmpSz := origSize
|
||||||
|
lastCommitSize := origSize
|
||||||
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
|
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
|
||||||
for {
|
for {
|
||||||
Tracef("%s: copy-shadow: %s @ %d", db.path, filename, newSize)
|
Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz)
|
||||||
|
|
||||||
// Read next page from WAL file.
|
// Read next page from WAL file.
|
||||||
if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
Tracef("%s: copy-shadow: break %s", db.path, err)
|
Tracef("%s: copy-shadow: break %s", db.path, err)
|
||||||
break // end of file or partial page
|
break // end of file or partial page
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return newSize, fmt.Errorf("read wal: %w", err)
|
return 0, fmt.Errorf("read wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
||||||
@@ -1008,29 +1008,39 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|||||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header
|
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header
|
||||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data
|
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data
|
||||||
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
||||||
return newSize, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", newSize, chksum0, chksum1, fchksum0, fchksum1)
|
return 0, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1)
|
||||||
}
|
|
||||||
|
|
||||||
// Write frame to shadow WAL.
|
|
||||||
if _, err := w.Write(buf); err != nil {
|
|
||||||
return newSize, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add page to the new size of the shadow WAL.
|
// Add page to the new size of the shadow WAL.
|
||||||
newSize += int64(len(buf))
|
tmpSz += int64(len(buf))
|
||||||
|
|
||||||
|
// Mark commit record.
|
||||||
|
newDBSize := binary.BigEndian.Uint32(buf[4:])
|
||||||
|
if newDBSize != 0 {
|
||||||
|
lastCommitSize = tmpSz
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync & close writer.
|
// Seek to correct position on both files.
|
||||||
if err := w.Sync(); err != nil {
|
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||||
return newSize, err
|
return 0, fmt.Errorf("wal seek: %w", err)
|
||||||
|
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
||||||
|
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy bytes, sync & close.
|
||||||
|
if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil {
|
||||||
|
return 0, err
|
||||||
|
} else if err := w.Sync(); err != nil {
|
||||||
|
return 0, err
|
||||||
} else if err := w.Close(); err != nil {
|
} else if err := w.Close(); err != nil {
|
||||||
return newSize, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track total number of bytes written to WAL.
|
// Track total number of bytes written to WAL.
|
||||||
db.totalWALBytesCounter.Add(float64(newSize - origSize))
|
db.totalWALBytesCounter.Add(float64(lastCommitSize - origSize))
|
||||||
|
|
||||||
return newSize, nil
|
return lastCommitSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
|
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
|
||||||
@@ -1341,6 +1351,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot find max wal index for restore: %w", err)
|
return fmt.Errorf("cannot find max wal index for restore: %w", err)
|
||||||
}
|
}
|
||||||
|
log.Printf("%s(%s): starting restore: generation %08x, index %08x-%08x", db.path, r.Name(), generation, minWALIndex, maxWALIndex)
|
||||||
|
|
||||||
// Initialize starting position.
|
// Initialize starting position.
|
||||||
pos := Pos{Generation: generation, Index: minWALIndex}
|
pos := Pos{Generation: generation, Index: minWALIndex}
|
||||||
|
|||||||
Reference in New Issue
Block a user