Fix wal rollover
This commit is contained in:
@@ -49,7 +49,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Notify user that initialization is done.
|
||||
fmt.Printf("Initialized with %d databases; replication initialized.\n", len(m.DBs))
|
||||
fmt.Printf("Initialized with %d databases.\n", len(m.DBs))
|
||||
|
||||
// Wait for signal to stop program.
|
||||
<-ctx.Done()
|
||||
|
||||
166
db.go
166
db.go
@@ -27,15 +27,12 @@ const (
|
||||
|
||||
// DB represents a managed instance of a SQLite database in the file system.
|
||||
type DB struct {
|
||||
mu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
path string // part to database
|
||||
db *sql.DB // target database
|
||||
rtx *sql.Tx // long running read transaction
|
||||
pageSize int // page size, in bytes
|
||||
|
||||
byteOrder binary.ByteOrder // determined by WAL header magic
|
||||
salt0, salt1 uint32 // read from WAL header
|
||||
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
wg sync.WaitGroup
|
||||
@@ -132,9 +129,45 @@ func (db *DB) CurrentShadowWALPath(generation string) (string, error) {
|
||||
}
|
||||
|
||||
func (db *DB) Open() (err error) {
|
||||
db.wg.Add(1)
|
||||
go func() { defer db.wg.Done(); db.monitor() }()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close releases the read lock & closes the database. This method should only
|
||||
// be called by tests as it causes the underlying database to be checkpointed.
|
||||
func (db *DB) Close() (err error) {
|
||||
if e := db.SoftClose(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
if db.db != nil {
|
||||
if e := db.db.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Init initializes the connection to the database.
|
||||
// Skipped if already initialized or if the database file does not exist.
|
||||
func (db *DB) Init() (err error) {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
// Exit if already initialized.
|
||||
if db.db != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Exit if no database file exists.
|
||||
if _, err := os.Stat(db.path); os.IsNotExist(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Connect to SQLite database & enable WAL.
|
||||
if db.db, err = sql.Open("sqlite3", db.path); err != nil {
|
||||
return err
|
||||
@@ -142,6 +175,11 @@ func (db *DB) Open() (err error) {
|
||||
return fmt.Errorf("enable wal: %w", err)
|
||||
}
|
||||
|
||||
// Disable autocheckpoint.
|
||||
if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil {
|
||||
return fmt.Errorf("disable autocheckpoint: %w", err)
|
||||
}
|
||||
|
||||
// Create a table to force writes to the WAL when empty.
|
||||
// There should only ever be one row with id=1.
|
||||
if _, err := db.db.Exec(`CREATE TABLE IF NOT EXISTS _litestream_seq (id INTEGER PRIMARY KEY, seq INTEGER);`); err != nil {
|
||||
@@ -177,9 +215,6 @@ func (db *DB) Open() (err error) {
|
||||
return fmt.Errorf("clean: %w", err)
|
||||
}
|
||||
|
||||
db.wg.Add(1)
|
||||
go func() { defer db.wg.Done(); db.monitor() }()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -192,7 +227,9 @@ func (db *DB) clean() error {
|
||||
|
||||
dir := filepath.Join(db.MetaPath(), "generations")
|
||||
fis, err := ioutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, fi := range fis {
|
||||
@@ -209,21 +246,6 @@ func (db *DB) clean() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close releases the read lock & closes the database. This method should only
|
||||
// be called by tests as it causes the underlying database to be checkpointed.
|
||||
func (db *DB) Close() (err error) {
|
||||
if e := db.SoftClose(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
if db.db != nil {
|
||||
if e := db.db.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// SoftClose closes everything but the underlying db connection. This method
|
||||
// is available because the binary needs to avoid closing the database on exit
|
||||
// to prevent autocheckpointing.
|
||||
@@ -251,8 +273,8 @@ func (db *DB) acquireReadLock() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Disable autocheckpointing on this connection.
|
||||
if _, err := tx.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil {
|
||||
// Execute read query to obtain read lock.
|
||||
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
@@ -329,12 +351,23 @@ func (db *DB) createGeneration() (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove old generations.
|
||||
if err := db.clean(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return generation, nil
|
||||
}
|
||||
|
||||
// Sync copies pending data from the WAL to the shadow WAL.
|
||||
func (db *DB) Sync() (err error) {
|
||||
// TODO: Lock DB while syncing?
|
||||
db.mu.RLock()
|
||||
defer db.mu.RUnlock()
|
||||
|
||||
// No database exists, exit.
|
||||
if db.db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Force "-wal" file if it doesn't exist.
|
||||
|
||||
@@ -364,11 +397,6 @@ func (db *DB) Sync() (err error) {
|
||||
return fmt.Errorf("_litestream_lock: %w", err)
|
||||
}
|
||||
|
||||
// Disable the autocheckpoint.
|
||||
if _, err := tx.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil {
|
||||
return fmt.Errorf("disable autocheckpoint: %w", err)
|
||||
}
|
||||
|
||||
// Verify our last sync matches the current state of the WAL.
|
||||
// This ensures that we have an existing generation & that the last sync
|
||||
// position of the real WAL hasn't been overwritten by another process.
|
||||
@@ -385,7 +413,8 @@ func (db *DB) Sync() (err error) {
|
||||
log.Printf("%s: new generation %q, %s", db.path, info.generation, info.reason)
|
||||
|
||||
// Clear shadow wal info.
|
||||
info.shadowWALPath, info.shadowWALSize = "", 0
|
||||
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
|
||||
info.shadowWALSize = WALHeaderSize
|
||||
info.restart = false
|
||||
info.reason = ""
|
||||
}
|
||||
@@ -441,7 +470,7 @@ func (db *DB) verifyWAL() (info syncInfo, err error) {
|
||||
if err != nil {
|
||||
return info, fmt.Errorf("cannot find current generation: %w", err)
|
||||
} else if generation == "" {
|
||||
info.reason = "no generation"
|
||||
info.reason = "no generation exists"
|
||||
return info, nil
|
||||
}
|
||||
info.generation = generation
|
||||
@@ -489,6 +518,7 @@ func (db *DB) verifyWAL() (info syncInfo, err error) {
|
||||
}
|
||||
|
||||
// Verify last page synced still matches.
|
||||
if info.shadowWALSize > WALHeaderSize {
|
||||
offset := info.shadowWALSize - int64(db.pageSize+WALFrameHeaderSize)
|
||||
if buf0, err := readFileAt(db.WALPath(), offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil {
|
||||
return info, fmt.Errorf("cannot read last synced wal page: %w", err)
|
||||
@@ -498,6 +528,7 @@ func (db *DB) verifyWAL() (info syncInfo, err error) {
|
||||
info.reason = "wal overwritten by another process"
|
||||
return info, nil
|
||||
}
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
@@ -549,24 +580,15 @@ func (db *DB) initShadowWALFile(filename string) error {
|
||||
}
|
||||
|
||||
// Determine byte order for checksumming from header magic.
|
||||
magic := binary.BigEndian.Uint32(hdr[0:])
|
||||
switch magic {
|
||||
case 0x377f0682:
|
||||
db.byteOrder = binary.LittleEndian
|
||||
case 0x377f0683:
|
||||
db.byteOrder = binary.BigEndian
|
||||
default:
|
||||
return fmt.Errorf("invalid wal header magic: %x", magic)
|
||||
bo, err := headerByteOrder(hdr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read header salt.
|
||||
db.salt0 = binary.BigEndian.Uint32(hdr[16:])
|
||||
db.salt1 = binary.BigEndian.Uint32(hdr[20:])
|
||||
|
||||
// Verify checksum.
|
||||
s0 := binary.BigEndian.Uint32(hdr[24:])
|
||||
s1 := binary.BigEndian.Uint32(hdr[28:])
|
||||
if v0, v1 := Checksum(db.byteOrder, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 {
|
||||
if v0, v1 := Checksum(bo, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 {
|
||||
return fmt.Errorf("invalid header checksum: (%x,%x) != (%x,%x)", v0, v1, s0, s1)
|
||||
}
|
||||
|
||||
@@ -595,6 +617,19 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Read shadow WAL header to determine byte order for checksum & salt.
|
||||
hdr := make([]byte, WALHeaderSize)
|
||||
if _, err := io.ReadFull(w, hdr); err != nil {
|
||||
return 0, fmt.Errorf("read header: %w", err)
|
||||
}
|
||||
hsalt0 := binary.BigEndian.Uint32(hdr[16:])
|
||||
hsalt1 := binary.BigEndian.Uint32(hdr[20:])
|
||||
|
||||
bo, err := headerByteOrder(hdr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Read previous checksum.
|
||||
chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize)
|
||||
if err != nil {
|
||||
@@ -624,19 +659,24 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
||||
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
||||
salt0 := binary.BigEndian.Uint32(buf[8:])
|
||||
salt1 := binary.BigEndian.Uint32(buf[12:])
|
||||
if salt0 != db.salt0 || salt1 != db.salt1 {
|
||||
if salt0 != hsalt0 || salt1 != hsalt1 {
|
||||
break
|
||||
}
|
||||
|
||||
// Verify checksum of page is valid.
|
||||
fchksum0 := binary.BigEndian.Uint32(buf[16:])
|
||||
fchksum1 := binary.BigEndian.Uint32(buf[20:])
|
||||
chksum0, chksum1 = Checksum(db.byteOrder, chksum0, chksum1, buf[:8]) // frame header
|
||||
chksum0, chksum1 = Checksum(db.byteOrder, chksum0, chksum1, buf[24:]) // frame data
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data
|
||||
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
||||
return newSize, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", newSize, chksum0, chksum1, fchksum0, fchksum1)
|
||||
}
|
||||
|
||||
// Write frame to shadow WAL.
|
||||
if _, err := w.Write(buf); err != nil {
|
||||
return newSize, err
|
||||
}
|
||||
|
||||
// Add page to the new size of the shadow WAL.
|
||||
newSize += int64(len(buf))
|
||||
}
|
||||
@@ -694,9 +734,11 @@ func (db *DB) checkpoint(force bool) error {
|
||||
rawsql = `PRAGMA wal_checkpoint(RESTART);`
|
||||
}
|
||||
|
||||
if _, err := db.db.Exec(rawsql); err != nil {
|
||||
var row [3]int
|
||||
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("%s: checkpoint: force=%v (%d,%d,%d)", db.path, force, row[0], row[1], row[2])
|
||||
|
||||
// Reacquire the read lock immediately after the checkpoint.
|
||||
if err := db.acquireReadLock(); err != nil {
|
||||
@@ -714,17 +756,31 @@ func (db *DB) monitor() {
|
||||
// Wait for ticker or context close.
|
||||
select {
|
||||
case <-db.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
// Ensure the database is initialized.
|
||||
if err := db.Init(); err != nil {
|
||||
log.Printf("%s: init error: %s", db.path, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Sync the database to the shadow WAL.
|
||||
if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
log.Printf("%s: sync error: %s", db.path, err)
|
||||
}
|
||||
|
||||
// If context closed, exit after final sync.
|
||||
if db.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
|
||||
magic := binary.BigEndian.Uint32(hdr[0:])
|
||||
switch magic {
|
||||
case 0x377f0682:
|
||||
return binary.LittleEndian, nil
|
||||
case 0x377f0683:
|
||||
return binary.BigEndian, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid wal header magic: %x", magic)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user