From 7f81890bae116d6e71eecbe82c9945914960504a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 6 Feb 2021 07:21:48 -0700 Subject: [PATCH] Fix shadow wal corruption on stalled validation This commit fixes a timing bug that occurs in a specific scenario where the shadow wal sync stalls because of an s3 validation and the catch up write to the shadow wal is large enough to allow a window between WAL reads and the final copy. The file copy has been replaced by direct writes of the frame buffer to the shadow to ensure that every validated byte is exactly what is being written to the shadow wal. The one downside to this change is that the frame buffer will grow with the transaction size so it will use additional heap. This can be replaced by a spill-to-disk implementation but this should work well in the short term. --- db.go | 69 +++++++++++++++++++++++++++++---------------------- litestream.go | 4 +-- replica.go | 49 +++++++++++++++++++++++++++++++++--- 3 files changed, 86 insertions(+), 36 deletions(-) diff --git a/db.go b/db.go index c9f3b35..65feea4 100644 --- a/db.go +++ b/db.go @@ -1025,66 +1025,65 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { return 0, fmt.Errorf("last checksum: %w", err) } - // Seek to correct position on both files. + // Seek to correct position on real wal. if _, err := r.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("wal seek: %w", err) + return 0, fmt.Errorf("real wal seek: %w", err) + } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { + return 0, fmt.Errorf("shadow wal seek: %w", err) } // Read through WAL from last position to find the page of the last // committed transaction. - tmpSz := origSize + frame := make([]byte, db.pageSize+WALFrameHeaderSize) + var buf bytes.Buffer + offset := origSize lastCommitSize := origSize - buf := make([]byte, db.pageSize+WALFrameHeaderSize) for { - Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz) - // Read next page from WAL file. - if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF { - Tracef("%s: copy-shadow: break %s", db.path, err) + if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF { + Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err) break // end of file or partial page } else if err != nil { return 0, fmt.Errorf("read wal: %w", err) } // Read frame salt & compare to header salt. Stop reading on mismatch. - salt0 := binary.BigEndian.Uint32(buf[8:]) - salt1 := binary.BigEndian.Uint32(buf[12:]) + salt0 := binary.BigEndian.Uint32(frame[8:]) + salt1 := binary.BigEndian.Uint32(frame[12:]) if salt0 != hsalt0 || salt1 != hsalt1 { Tracef("%s: copy-shadow: break: salt mismatch", db.path) break } // Verify checksum of page is valid. - fchksum0 := binary.BigEndian.Uint32(buf[16:]) - fchksum1 := binary.BigEndian.Uint32(buf[20:]) - chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header - chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data + fchksum0 := binary.BigEndian.Uint32(frame[16:]) + fchksum1 := binary.BigEndian.Uint32(frame[20:]) + chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header + chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data if chksum0 != fchksum0 || chksum1 != fchksum1 { - log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1) + log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", offset, chksum0, chksum1, fchksum0, fchksum1) break } // Add page to the new size of the shadow WAL. - tmpSz += int64(len(buf)) + buf.Write(frame) - // Mark commit record. - newDBSize := binary.BigEndian.Uint32(buf[4:]) + Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1) + offset += int64(len(frame)) + + // Flush to shadow WAL if commit record. + newDBSize := binary.BigEndian.Uint32(frame[4:]) if newDBSize != 0 { - lastCommitSize = tmpSz + if _, err := buf.WriteTo(w); err != nil { + return 0, fmt.Errorf("write shadow wal: %w", err) + } + buf.Reset() + lastCommitSize = offset } } - // Seek to correct position on both files. - if _, err := r.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("wal seek: %w", err) - } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("shadow wal seek: %w", err) - } - - // Copy bytes, sync & close. - if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil { - return 0, err - } else if err := w.Sync(); err != nil { + // Sync & close. + if err := w.Sync(); err != nil { return 0, err } else if err := w.Close(); err != nil { return 0, err @@ -1107,6 +1106,8 @@ func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) { return nil, err } else if r.N() > 0 { return r, nil + } else if err := r.Close(); err != nil { // no data, close, try next + return nil, err } // Otherwise attempt to read the start of the next WAL file. @@ -1180,6 +1181,9 @@ type ShadowWALReader struct { pos Pos } +// Name returns the filename of the underlying file. +func (r *ShadowWALReader) Name() string { return r.f.Name() } + // Close closes the underlying WAL file handle. func (r *ShadowWALReader) Close() error { return r.f.Close() } @@ -1297,6 +1301,11 @@ func (db *DB) checkpointAndInit(generation, mode string) error { return err } + // Copy shadow WAL before checkpoint to copy as much as possible. + if _, err := db.copyToShadowWAL(shadowWALPath); err != nil { + return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err) + } + // Execute checkpoint and immediately issue a write to the WAL to ensure // a new page is written. if err := db.checkpoint(mode); err != nil { diff --git a/litestream.go b/litestream.go index f887372..edca37a 100644 --- a/litestream.go +++ b/litestream.go @@ -95,9 +95,9 @@ type Pos struct { // String returns a string representation. func (p Pos) String() string { if p.IsZero() { - return "<>" + return "" } - return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset) + return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset) } // IsZero returns true if p is the zero value. diff --git a/replica.go b/replica.go index 8987485..f9b2726 100644 --- a/replica.go +++ b/replica.go @@ -2,6 +2,7 @@ package litestream import ( "context" + "encoding/binary" "fmt" "io" "io/ioutil" @@ -598,6 +599,8 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { } generation := dpos.Generation + Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos) + // Create snapshot if no snapshots exist for generation. if n, err := r.snapshotN(generation); err != nil { return err @@ -617,6 +620,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { return fmt.Errorf("cannot determine replica position: %s", err) } + Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos) r.mu.Lock() r.pos = pos r.mu.Unlock() @@ -669,10 +673,47 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) { return err } - n, err := io.Copy(w, rd) - r.walBytesCounter.Add(float64(n)) - if err != nil { - return err + // Copy header if at offset zero. + var psalt uint64 // previous salt value + if pos := rd.Pos(); pos.Offset == 0 { + buf := make([]byte, WALHeaderSize) + if _, err := io.ReadFull(rd, buf); err != nil { + return err + } + + psalt = binary.BigEndian.Uint64(buf[16:24]) + + n, err := w.Write(buf) + if err != nil { + return err + } + r.walBytesCounter.Add(float64(n)) + } + + // Copy frames. + for { + pos := rd.Pos() + assert(pos.Offset == frameAlign(pos.Offset, r.db.pageSize), "shadow wal reader not frame aligned") + + buf := make([]byte, WALFrameHeaderSize+r.db.pageSize) + if _, err := io.ReadFull(rd, buf); err == io.EOF { + break + } else if err != nil { + return err + } + + // Verify salt matches the previous frame/header read. + salt := binary.BigEndian.Uint64(buf[8:16]) + if psalt != 0 && psalt != salt { + return fmt.Errorf("replica salt mismatch: %s", filepath.Base(filename)) + } + psalt = salt + + n, err := w.Write(buf) + if err != nil { + return err + } + r.walBytesCounter.Add(float64(n)) } if err := w.Sync(); err != nil {