Merge pull request #37 from benbjohnson/fix-shadow-write
Fix shadow wal corruption on stalled validation
This commit is contained in:
69
db.go
69
db.go
@@ -1025,66 +1025,65 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
||||
return 0, fmt.Errorf("last checksum: %w", err)
|
||||
}
|
||||
|
||||
// Seek to correct position on both files.
|
||||
// Seek to correct position on real wal.
|
||||
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("wal seek: %w", err)
|
||||
return 0, fmt.Errorf("real wal seek: %w", err)
|
||||
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
||||
}
|
||||
|
||||
// Read through WAL from last position to find the page of the last
|
||||
// committed transaction.
|
||||
tmpSz := origSize
|
||||
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
|
||||
var buf bytes.Buffer
|
||||
offset := origSize
|
||||
lastCommitSize := origSize
|
||||
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
|
||||
for {
|
||||
Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz)
|
||||
|
||||
// Read next page from WAL file.
|
||||
if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
Tracef("%s: copy-shadow: break %s", db.path, err)
|
||||
if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err)
|
||||
break // end of file or partial page
|
||||
} else if err != nil {
|
||||
return 0, fmt.Errorf("read wal: %w", err)
|
||||
}
|
||||
|
||||
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
||||
salt0 := binary.BigEndian.Uint32(buf[8:])
|
||||
salt1 := binary.BigEndian.Uint32(buf[12:])
|
||||
salt0 := binary.BigEndian.Uint32(frame[8:])
|
||||
salt1 := binary.BigEndian.Uint32(frame[12:])
|
||||
if salt0 != hsalt0 || salt1 != hsalt1 {
|
||||
Tracef("%s: copy-shadow: break: salt mismatch", db.path)
|
||||
break
|
||||
}
|
||||
|
||||
// Verify checksum of page is valid.
|
||||
fchksum0 := binary.BigEndian.Uint32(buf[16:])
|
||||
fchksum1 := binary.BigEndian.Uint32(buf[20:])
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data
|
||||
fchksum0 := binary.BigEndian.Uint32(frame[16:])
|
||||
fchksum1 := binary.BigEndian.Uint32(frame[20:])
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
|
||||
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
||||
log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1)
|
||||
log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", offset, chksum0, chksum1, fchksum0, fchksum1)
|
||||
break
|
||||
}
|
||||
|
||||
// Add page to the new size of the shadow WAL.
|
||||
tmpSz += int64(len(buf))
|
||||
buf.Write(frame)
|
||||
|
||||
// Mark commit record.
|
||||
newDBSize := binary.BigEndian.Uint32(buf[4:])
|
||||
Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
|
||||
offset += int64(len(frame))
|
||||
|
||||
// Flush to shadow WAL if commit record.
|
||||
newDBSize := binary.BigEndian.Uint32(frame[4:])
|
||||
if newDBSize != 0 {
|
||||
lastCommitSize = tmpSz
|
||||
if _, err := buf.WriteTo(w); err != nil {
|
||||
return 0, fmt.Errorf("write shadow wal: %w", err)
|
||||
}
|
||||
buf.Reset()
|
||||
lastCommitSize = offset
|
||||
}
|
||||
}
|
||||
|
||||
// Seek to correct position on both files.
|
||||
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("wal seek: %w", err)
|
||||
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
||||
}
|
||||
|
||||
// Copy bytes, sync & close.
|
||||
if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil {
|
||||
return 0, err
|
||||
} else if err := w.Sync(); err != nil {
|
||||
// Sync & close.
|
||||
if err := w.Sync(); err != nil {
|
||||
return 0, err
|
||||
} else if err := w.Close(); err != nil {
|
||||
return 0, err
|
||||
@@ -1107,6 +1106,8 @@ func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
|
||||
return nil, err
|
||||
} else if r.N() > 0 {
|
||||
return r, nil
|
||||
} else if err := r.Close(); err != nil { // no data, close, try next
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Otherwise attempt to read the start of the next WAL file.
|
||||
@@ -1180,6 +1181,9 @@ type ShadowWALReader struct {
|
||||
pos Pos
|
||||
}
|
||||
|
||||
// Name returns the filename of the underlying file.
|
||||
func (r *ShadowWALReader) Name() string { return r.f.Name() }
|
||||
|
||||
// Close closes the underlying WAL file handle.
|
||||
func (r *ShadowWALReader) Close() error { return r.f.Close() }
|
||||
|
||||
@@ -1297,6 +1301,11 @@ func (db *DB) checkpointAndInit(generation, mode string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy shadow WAL before checkpoint to copy as much as possible.
|
||||
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
||||
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
|
||||
}
|
||||
|
||||
// Execute checkpoint and immediately issue a write to the WAL to ensure
|
||||
// a new page is written.
|
||||
if err := db.checkpoint(mode); err != nil {
|
||||
|
||||
@@ -95,9 +95,9 @@ type Pos struct {
|
||||
// String returns a string representation.
|
||||
func (p Pos) String() string {
|
||||
if p.IsZero() {
|
||||
return "<>"
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset)
|
||||
return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset)
|
||||
}
|
||||
|
||||
// IsZero returns true if p is the zero value.
|
||||
|
||||
49
replica.go
49
replica.go
@@ -2,6 +2,7 @@ package litestream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@@ -598,6 +599,8 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
|
||||
}
|
||||
generation := dpos.Generation
|
||||
|
||||
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
|
||||
|
||||
// Create snapshot if no snapshots exist for generation.
|
||||
if n, err := r.snapshotN(generation); err != nil {
|
||||
return err
|
||||
@@ -617,6 +620,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
|
||||
return fmt.Errorf("cannot determine replica position: %s", err)
|
||||
}
|
||||
|
||||
Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos)
|
||||
r.mu.Lock()
|
||||
r.pos = pos
|
||||
r.mu.Unlock()
|
||||
@@ -669,10 +673,47 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
n, err := io.Copy(w, rd)
|
||||
r.walBytesCounter.Add(float64(n))
|
||||
if err != nil {
|
||||
return err
|
||||
// Copy header if at offset zero.
|
||||
var psalt uint64 // previous salt value
|
||||
if pos := rd.Pos(); pos.Offset == 0 {
|
||||
buf := make([]byte, WALHeaderSize)
|
||||
if _, err := io.ReadFull(rd, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
psalt = binary.BigEndian.Uint64(buf[16:24])
|
||||
|
||||
n, err := w.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.walBytesCounter.Add(float64(n))
|
||||
}
|
||||
|
||||
// Copy frames.
|
||||
for {
|
||||
pos := rd.Pos()
|
||||
assert(pos.Offset == frameAlign(pos.Offset, r.db.pageSize), "shadow wal reader not frame aligned")
|
||||
|
||||
buf := make([]byte, WALFrameHeaderSize+r.db.pageSize)
|
||||
if _, err := io.ReadFull(rd, buf); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Verify salt matches the previous frame/header read.
|
||||
salt := binary.BigEndian.Uint64(buf[8:16])
|
||||
if psalt != 0 && psalt != salt {
|
||||
return fmt.Errorf("replica salt mismatch: %s", filepath.Base(filename))
|
||||
}
|
||||
psalt = salt
|
||||
|
||||
n, err := w.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.walBytesCounter.Add(float64(n))
|
||||
}
|
||||
|
||||
if err := w.Sync(); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user