From 031a526b9ad007bf46821ae935315b6a0503ae67 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 21 Jan 2021 12:44:11 -0700 Subject: [PATCH] Only copy committed WAL pages --- db.go | 49 ++++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/db.go b/db.go index 01a6ff0..1bccdb7 100644 --- a/db.go +++ b/db.go @@ -976,22 +976,22 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { // Seek to correct position on both files. if _, err := r.Seek(origSize, io.SeekStart); err != nil { return 0, fmt.Errorf("wal seek: %w", err) - } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("shadow wal seek: %w", err) } - // Loop over each page, verify checksum, & copy to writer. - newSize = origSize + // Read through WAL from last position to find the page of the last + // committed transaction. + tmpSz := origSize + lastCommitSize := origSize buf := make([]byte, db.pageSize+WALFrameHeaderSize) for { - Tracef("%s: copy-shadow: %s @ %d", db.path, filename, newSize) + Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz) // Read next page from WAL file. if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF { Tracef("%s: copy-shadow: break %s", db.path, err) break // end of file or partial page } else if err != nil { - return newSize, fmt.Errorf("read wal: %w", err) + return 0, fmt.Errorf("read wal: %w", err) } // Read frame salt & compare to header salt. Stop reading on mismatch. @@ -1008,29 +1008,39 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data if chksum0 != fchksum0 || chksum1 != fchksum1 { - return newSize, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", newSize, chksum0, chksum1, fchksum0, fchksum1) - } - - // Write frame to shadow WAL. - if _, err := w.Write(buf); err != nil { - return newSize, err + return 0, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1) } // Add page to the new size of the shadow WAL. - newSize += int64(len(buf)) + tmpSz += int64(len(buf)) + + // Mark commit record. + newDBSize := binary.BigEndian.Uint32(buf[4:]) + if newDBSize != 0 { + lastCommitSize = tmpSz + } } - // Sync & close writer. - if err := w.Sync(); err != nil { - return newSize, err + // Seek to correct position on both files. + if _, err := r.Seek(origSize, io.SeekStart); err != nil { + return 0, fmt.Errorf("wal seek: %w", err) + } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { + return 0, fmt.Errorf("shadow wal seek: %w", err) + } + + // Copy bytes, sync & close. + if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil { + return 0, err + } else if err := w.Sync(); err != nil { + return 0, err } else if err := w.Close(); err != nil { - return newSize, err + return 0, err } // Track total number of bytes written to WAL. - db.totalWALBytesCounter.Add(float64(newSize - origSize)) + db.totalWALBytesCounter.Add(float64(lastCommitSize - origSize)) - return newSize, nil + return lastCommitSize, nil } // ShadowWALReader opens a reader for a shadow WAL file at a given position. @@ -1341,6 +1351,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { if err != nil { return fmt.Errorf("cannot find max wal index for restore: %w", err) } + log.Printf("%s(%s): starting restore: generation %08x, index %08x-%08x", db.path, r.Name(), generation, minWALIndex, maxWALIndex) // Initialize starting position. pos := Pos{Generation: generation, Index: minWALIndex}