diff --git a/db.go b/db.go index a78a961..50ec011 100644 --- a/db.go +++ b/db.go @@ -441,7 +441,7 @@ func (db *DB) Sync() (err error) { // Issue the checkpoint. if checkpoint { - if err := db.checkpoint(forceCheckpoint); err != nil { + if err := db.checkpoint(info, forceCheckpoint); err != nil { return fmt.Errorf("checkpoint: force=%v err=%w", err) } } @@ -514,20 +514,13 @@ func (db *DB) verify() (info syncInfo, err error) { return info, nil } - // Compare WAL headers. Ensure that checkpoint sequences match and the - // header is the same or that we have moved to the next sequence. If we have - // out-of-order or skipped checkpoint sequences then we should start a new - // generation. + // Compare WAL headers. Start a new shadow WAL if they are mismatched. if hdr0, err := readWALHeader(db.WALPath()); err != nil { return info, fmt.Errorf("cannot read wal header: %w", err) } else if hdr1, err := readWALHeader(info.shadowWALPath); err != nil { return info, fmt.Errorf("cannot read shadow wal header: %w", err) - } else if seq0, seq1 := readCheckpointSeqNo(hdr0), readCheckpointSeqNo(hdr1); seq0 == seq1 && !bytes.Equal(hdr0, hdr1) { - return info, fmt.Errorf("wal header mismatch: chkpt=%d", seq0) - } else if seq0+1 == seq1 { + } else if !bytes.Equal(hdr0, hdr1) { info.restart = !bytes.Equal(hdr0, hdr1) - } else if seq0 != seq1 { - return info, fmt.Errorf("non-contiguous checkpoint sequence: real=%d shadow=%d", seq0, seq1) } // TODO: Handle checkpoint sequence number rollover. @@ -730,7 +723,13 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) { } // checkpoint performs a checkpoint on the WAL file. -func (db *DB) checkpoint(force bool) error { +func (db *DB) checkpoint(info syncInfo, force bool) error { + // Read WAL header before checkpoint to check if it has been restarted. + hdr, err := readWALHeader(db.WALPath()) + if err != nil { + return err + } + // Ensure the read lock has been removed before issuing a checkpoint. // We defer the re-acquire to ensure it occurs even on an early return. if err := db.releaseReadLock(); err != nil { @@ -759,6 +758,31 @@ func (db *DB) checkpoint(force bool) error { if err := db.acquireReadLock(); err != nil { return fmt.Errorf("release read lock: %w", err) } + + if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil { + return err + } + + // If WAL hasn't been restarted, exit. + if other, err := readWALHeader(db.WALPath()); err != nil { + return err + } else if bytes.Equal(hdr, other) { + return nil + } + + // Parse index of current shadow WAL file. + dir, base := filepath.Split(info.shadowWALPath) + index, err := ParseWALFilename(base) + if err != nil { + return fmt.Errorf("cannot parse shadow wal filename: %s", base) + } + + // Start a new shadow WAL file with next index. + newShadowWALPath := filepath.Join(dir, FormatWALFilename(index+1)) + if err := db.initShadowWALFile(newShadowWALPath); err != nil { + return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) + } + return nil } diff --git a/litestream.go b/litestream.go index 053fdce..0272c8d 100644 --- a/litestream.go +++ b/litestream.go @@ -99,7 +99,7 @@ func ParseWALFilename(name string) (index int, err error) { func FormatWALFilename(index int) string { assert(index >= 0, "wal index must be non-negative") - return fmt.Sprintf("%016d%s", index, WALExt) + return fmt.Sprintf("%016x%s", index, WALExt) } // HexDump returns hexdump output but with duplicate lines removed.