diff --git a/db.go b/db.go index c9f3b35..65feea4 100644 --- a/db.go +++ b/db.go @@ -1025,66 +1025,65 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { return 0, fmt.Errorf("last checksum: %w", err) } - // Seek to correct position on both files. + // Seek to correct position on real wal. if _, err := r.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("wal seek: %w", err) + return 0, fmt.Errorf("real wal seek: %w", err) + } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { + return 0, fmt.Errorf("shadow wal seek: %w", err) } // Read through WAL from last position to find the page of the last // committed transaction. - tmpSz := origSize + frame := make([]byte, db.pageSize+WALFrameHeaderSize) + var buf bytes.Buffer + offset := origSize lastCommitSize := origSize - buf := make([]byte, db.pageSize+WALFrameHeaderSize) for { - Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz) - // Read next page from WAL file. - if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF { - Tracef("%s: copy-shadow: break %s", db.path, err) + if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF { + Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err) break // end of file or partial page } else if err != nil { return 0, fmt.Errorf("read wal: %w", err) } // Read frame salt & compare to header salt. Stop reading on mismatch. - salt0 := binary.BigEndian.Uint32(buf[8:]) - salt1 := binary.BigEndian.Uint32(buf[12:]) + salt0 := binary.BigEndian.Uint32(frame[8:]) + salt1 := binary.BigEndian.Uint32(frame[12:]) if salt0 != hsalt0 || salt1 != hsalt1 { Tracef("%s: copy-shadow: break: salt mismatch", db.path) break } // Verify checksum of page is valid. - fchksum0 := binary.BigEndian.Uint32(buf[16:]) - fchksum1 := binary.BigEndian.Uint32(buf[20:]) - chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header - chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data + fchksum0 := binary.BigEndian.Uint32(frame[16:]) + fchksum1 := binary.BigEndian.Uint32(frame[20:]) + chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header + chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data if chksum0 != fchksum0 || chksum1 != fchksum1 { - log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1) + log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", offset, chksum0, chksum1, fchksum0, fchksum1) break } // Add page to the new size of the shadow WAL. - tmpSz += int64(len(buf)) + buf.Write(frame) - // Mark commit record. - newDBSize := binary.BigEndian.Uint32(buf[4:]) + Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1) + offset += int64(len(frame)) + + // Flush to shadow WAL if commit record. + newDBSize := binary.BigEndian.Uint32(frame[4:]) if newDBSize != 0 { - lastCommitSize = tmpSz + if _, err := buf.WriteTo(w); err != nil { + return 0, fmt.Errorf("write shadow wal: %w", err) + } + buf.Reset() + lastCommitSize = offset } } - // Seek to correct position on both files. - if _, err := r.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("wal seek: %w", err) - } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("shadow wal seek: %w", err) - } - - // Copy bytes, sync & close. - if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil { - return 0, err - } else if err := w.Sync(); err != nil { + // Sync & close. + if err := w.Sync(); err != nil { return 0, err } else if err := w.Close(); err != nil { return 0, err @@ -1107,6 +1106,8 @@ func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) { return nil, err } else if r.N() > 0 { return r, nil + } else if err := r.Close(); err != nil { // no data, close, try next + return nil, err } // Otherwise attempt to read the start of the next WAL file. @@ -1180,6 +1181,9 @@ type ShadowWALReader struct { pos Pos } +// Name returns the filename of the underlying file. +func (r *ShadowWALReader) Name() string { return r.f.Name() } + // Close closes the underlying WAL file handle. func (r *ShadowWALReader) Close() error { return r.f.Close() } @@ -1297,6 +1301,11 @@ func (db *DB) checkpointAndInit(generation, mode string) error { return err } + // Copy shadow WAL before checkpoint to copy as much as possible. + if _, err := db.copyToShadowWAL(shadowWALPath); err != nil { + return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err) + } + // Execute checkpoint and immediately issue a write to the WAL to ensure // a new page is written. if err := db.checkpoint(mode); err != nil { diff --git a/litestream.go b/litestream.go index f887372..edca37a 100644 --- a/litestream.go +++ b/litestream.go @@ -95,9 +95,9 @@ type Pos struct { // String returns a string representation. func (p Pos) String() string { if p.IsZero() { - return "<>" + return "" } - return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset) + return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset) } // IsZero returns true if p is the zero value. diff --git a/replica.go b/replica.go index 8987485..f9b2726 100644 --- a/replica.go +++ b/replica.go @@ -2,6 +2,7 @@ package litestream import ( "context" + "encoding/binary" "fmt" "io" "io/ioutil" @@ -598,6 +599,8 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { } generation := dpos.Generation + Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos) + // Create snapshot if no snapshots exist for generation. if n, err := r.snapshotN(generation); err != nil { return err @@ -617,6 +620,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { return fmt.Errorf("cannot determine replica position: %s", err) } + Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos) r.mu.Lock() r.pos = pos r.mu.Unlock() @@ -669,10 +673,47 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) { return err } - n, err := io.Copy(w, rd) - r.walBytesCounter.Add(float64(n)) - if err != nil { - return err + // Copy header if at offset zero. + var psalt uint64 // previous salt value + if pos := rd.Pos(); pos.Offset == 0 { + buf := make([]byte, WALHeaderSize) + if _, err := io.ReadFull(rd, buf); err != nil { + return err + } + + psalt = binary.BigEndian.Uint64(buf[16:24]) + + n, err := w.Write(buf) + if err != nil { + return err + } + r.walBytesCounter.Add(float64(n)) + } + + // Copy frames. + for { + pos := rd.Pos() + assert(pos.Offset == frameAlign(pos.Offset, r.db.pageSize), "shadow wal reader not frame aligned") + + buf := make([]byte, WALFrameHeaderSize+r.db.pageSize) + if _, err := io.ReadFull(rd, buf); err == io.EOF { + break + } else if err != nil { + return err + } + + // Verify salt matches the previous frame/header read. + salt := binary.BigEndian.Uint64(buf[8:16]) + if psalt != 0 && psalt != salt { + return fmt.Errorf("replica salt mismatch: %s", filepath.Base(filename)) + } + psalt = salt + + n, err := w.Write(buf) + if err != nil { + return err + } + r.walBytesCounter.Add(float64(n)) } if err := w.Sync(); err != nil {