Refactor DB.checkpoint() to accept any mode.

This was originally meant to add a TRUNCATE checkpoint before starting
a new generation, however, there is a write lock that blocks the
checkpoint and it's more complicated to roll it back and attempt the
truncation.
This commit is contained in:
Ben Johnson
2020-12-29 17:02:33 -07:00
parent 42a33cccf4
commit d4891f33da
2 changed files with 33 additions and 18 deletions

43
db.go
View File

@@ -495,11 +495,12 @@ func (db *DB) Sync() (err error) {
// If WAL size is great than max threshold, force checkpoint. // If WAL size is great than max threshold, force checkpoint.
// If WAL size is greater than min threshold, attempt checkpoint. // If WAL size is greater than min threshold, attempt checkpoint.
var checkpoint, forceCheckpoint bool var checkpoint bool
checkpointMode := CheckpointModePassive
if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
checkpoint, forceCheckpoint = true, false checkpoint = true
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { } else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
checkpoint, forceCheckpoint = true, true checkpoint, checkpointMode = true, CheckpointModeRestart
} }
// Release write lock before checkpointing & exiting. // Release write lock before checkpointing & exiting.
@@ -511,8 +512,8 @@ func (db *DB) Sync() (err error) {
if checkpoint { if checkpoint {
changed = true changed = true
if err := db.checkpoint(info, forceCheckpoint); err != nil { if err := db.checkpointAndInit(info, checkpointMode); err != nil {
return fmt.Errorf("checkpoint: force=%v err=%w", forceCheckpoint, err) return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err)
} }
} }
@@ -909,13 +910,7 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
} }
// checkpoint performs a checkpoint on the WAL file. // checkpoint performs a checkpoint on the WAL file.
func (db *DB) checkpoint(info syncInfo, force bool) error { func (db *DB) checkpoint(mode string) error {
// Read WAL header before checkpoint to check if it has been restarted.
hdr, err := readWALHeader(db.WALPath())
if err != nil {
return err
}
// Ensure the read lock has been removed before issuing a checkpoint. // Ensure the read lock has been removed before issuing a checkpoint.
// We defer the re-acquire to ensure it occurs even on an early return. // We defer the re-acquire to ensure it occurs even on an early return.
if err := db.releaseReadLock(); err != nil { if err := db.releaseReadLock(); err != nil {
@@ -929,23 +924,35 @@ func (db *DB) checkpoint(info syncInfo, force bool) error {
// forcing the checkpoint and restarting the WAL. // forcing the checkpoint and restarting the WAL.
// //
// See: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint // See: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
rawsql := `PRAGMA wal_checkpoint;` rawsql := `PRAGMA wal_checkpoint(` + mode + `);`
if force {
rawsql = `PRAGMA wal_checkpoint(RESTART);`
}
var row [3]int var row [3]int
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
return err return err
} }
log.Printf("%s: checkpoint: force=%v (%d,%d,%d)", db.path, force, row[0], row[1], row[2]) log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
// Reacquire the read lock immediately after the checkpoint. // Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil { if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err) return fmt.Errorf("release read lock: %w", err)
} }
return nil
}
if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil { // checkpointAndInit performs a checkpoint on the WAL file and initializes a
// new shadow WAL file.
func (db *DB) checkpointAndInit(info syncInfo, mode string) error {
// Read WAL header before checkpoint to check if it has been restarted.
hdr, err := readWALHeader(db.WALPath())
if err != nil {
return err
}
// Execute checkpoint and immediately issue a write to the WAL to ensure
// a new page is written.
if err := db.checkpoint(mode); err != nil {
return err
} else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
return err return err
} }

View File

@@ -27,6 +27,14 @@ const (
GenerationNameLen = 16 GenerationNameLen = 16
) )
// SQLite checkpoint modes.
const (
CheckpointModePassive = "PASSIVE"
CheckpointModeFull = "FULL"
CheckpointModeRestart = "RESTART"
CheckpointModeTruncate = "TRUNCATE"
)
// Litestream errors. // Litestream errors.
var ( var (
ErrNoSnapshots = errors.New("no snapshots available") ErrNoSnapshots = errors.New("no snapshots available")