From d4891f33da21850458d4432d9885403c5f9e5c73 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 29 Dec 2020 17:02:33 -0700 Subject: [PATCH] Refactor DB.checkpoint() to accept any mode. This was originally meant to add a TRUNCATE checkpoint before starting a new generation, however, there is a write lock that blocks the checkpoint and it's more complicated to roll it back and attempt the truncation. --- db.go | 43 +++++++++++++++++++++++++------------------ litestream.go | 8 ++++++++ 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/db.go b/db.go index 20bbefb..f02ea46 100644 --- a/db.go +++ b/db.go @@ -495,11 +495,12 @@ func (db *DB) Sync() (err error) { // If WAL size is great than max threshold, force checkpoint. // If WAL size is greater than min threshold, attempt checkpoint. - var checkpoint, forceCheckpoint bool + var checkpoint bool + checkpointMode := CheckpointModePassive if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { - checkpoint, forceCheckpoint = true, false + checkpoint = true } else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { - checkpoint, forceCheckpoint = true, true + checkpoint, checkpointMode = true, CheckpointModeRestart } // Release write lock before checkpointing & exiting. @@ -511,8 +512,8 @@ func (db *DB) Sync() (err error) { if checkpoint { changed = true - if err := db.checkpoint(info, forceCheckpoint); err != nil { - return fmt.Errorf("checkpoint: force=%v err=%w", forceCheckpoint, err) + if err := db.checkpointAndInit(info, checkpointMode); err != nil { + return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err) } } @@ -909,13 +910,7 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) { } // checkpoint performs a checkpoint on the WAL file. -func (db *DB) checkpoint(info syncInfo, force bool) error { - // Read WAL header before checkpoint to check if it has been restarted. - hdr, err := readWALHeader(db.WALPath()) - if err != nil { - return err - } - +func (db *DB) checkpoint(mode string) error { // Ensure the read lock has been removed before issuing a checkpoint. // We defer the re-acquire to ensure it occurs even on an early return. if err := db.releaseReadLock(); err != nil { @@ -929,23 +924,35 @@ func (db *DB) checkpoint(info syncInfo, force bool) error { // forcing the checkpoint and restarting the WAL. // // See: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint - rawsql := `PRAGMA wal_checkpoint;` - if force { - rawsql = `PRAGMA wal_checkpoint(RESTART);` - } + rawsql := `PRAGMA wal_checkpoint(` + mode + `);` var row [3]int if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { return err } - log.Printf("%s: checkpoint: force=%v (%d,%d,%d)", db.path, force, row[0], row[1], row[2]) + log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) // Reacquire the read lock immediately after the checkpoint. if err := db.acquireReadLock(); err != nil { return fmt.Errorf("release read lock: %w", err) } + return nil +} - if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil { +// checkpointAndInit performs a checkpoint on the WAL file and initializes a +// new shadow WAL file. +func (db *DB) checkpointAndInit(info syncInfo, mode string) error { + // Read WAL header before checkpoint to check if it has been restarted. + hdr, err := readWALHeader(db.WALPath()) + if err != nil { + return err + } + + // Execute checkpoint and immediately issue a write to the WAL to ensure + // a new page is written. + if err := db.checkpoint(mode); err != nil { + return err + } else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil { return err } diff --git a/litestream.go b/litestream.go index d41a7d1..d9783d5 100644 --- a/litestream.go +++ b/litestream.go @@ -27,6 +27,14 @@ const ( GenerationNameLen = 16 ) +// SQLite checkpoint modes. +const ( + CheckpointModePassive = "PASSIVE" + CheckpointModeFull = "FULL" + CheckpointModeRestart = "RESTART" + CheckpointModeTruncate = "TRUNCATE" +) + // Litestream errors. var ( ErrNoSnapshots = errors.New("no snapshots available")