Merge pull request #104 from benbjohnson/remove-sync-lock
Remove SQLite write lock during WAL sync
This commit is contained in:
48
db.go
48
db.go
@@ -733,27 +733,6 @@ func (db *DB) Sync() (err error) {
|
|||||||
return fmt.Errorf("ensure wal exists: %w", err)
|
return fmt.Errorf("ensure wal exists: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a transaction. This will be promoted immediately after.
|
|
||||||
tx, err := db.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("begin: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure write transaction rolls back before returning.
|
|
||||||
defer func() {
|
|
||||||
if e := rollback(tx); e != nil && err == nil {
|
|
||||||
err = e
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Insert into the lock table to promote to a write tx. The lock table
|
|
||||||
// insert will never actually occur because our tx will be rolled back,
|
|
||||||
// however, it will ensure our tx grabs the write lock. Unfortunately,
|
|
||||||
// we can't call "BEGIN IMMEDIATE" as we are already in a transaction.
|
|
||||||
if _, err := tx.ExecContext(db.ctx, `INSERT INTO _litestream_lock (id) VALUES (1);`); err != nil {
|
|
||||||
return fmt.Errorf("_litestream_lock: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify our last sync matches the current state of the WAL.
|
// Verify our last sync matches the current state of the WAL.
|
||||||
// This ensures that we have an existing generation & that the last sync
|
// This ensures that we have an existing generation & that the last sync
|
||||||
// position of the real WAL hasn't been overwritten by another process.
|
// position of the real WAL hasn't been overwritten by another process.
|
||||||
@@ -800,11 +779,6 @@ func (db *DB) Sync() (err error) {
|
|||||||
checkpoint = true
|
checkpoint = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release write lock before checkpointing & exiting.
|
|
||||||
if err := tx.Rollback(); err != nil {
|
|
||||||
return fmt.Errorf("rollback write tx: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Issue the checkpoint.
|
// Issue the checkpoint.
|
||||||
if checkpoint {
|
if checkpoint {
|
||||||
changed = true
|
changed = true
|
||||||
@@ -875,7 +849,7 @@ func (db *DB) verify() (info syncInfo, err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
info.walSize = fi.Size()
|
info.walSize = frameAlign(fi.Size(), db.pageSize)
|
||||||
info.walModTime = fi.ModTime()
|
info.walModTime = fi.ModTime()
|
||||||
db.walSizeGauge.Set(float64(fi.Size()))
|
db.walSizeGauge.Set(float64(fi.Size()))
|
||||||
|
|
||||||
@@ -899,7 +873,6 @@ func (db *DB) verify() (info syncInfo, err error) {
|
|||||||
}
|
}
|
||||||
info.shadowWALSize = frameAlign(fi.Size(), db.pageSize)
|
info.shadowWALSize = frameAlign(fi.Size(), db.pageSize)
|
||||||
|
|
||||||
// Truncate shadow WAL if there is a partial page.
|
|
||||||
// Exit if shadow WAL does not contain a full header.
|
// Exit if shadow WAL does not contain a full header.
|
||||||
if info.shadowWALSize < WALHeaderSize {
|
if info.shadowWALSize < WALHeaderSize {
|
||||||
info.reason = "short shadow wal"
|
info.reason = "short shadow wal"
|
||||||
@@ -1353,6 +1326,21 @@ func (db *DB) checkpointAndInit(generation, mode string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start a transaction. This will be promoted immediately after.
|
||||||
|
tx, err := db.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("begin: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = rollback(tx) }()
|
||||||
|
|
||||||
|
// Insert into the lock table to promote to a write tx. The lock table
|
||||||
|
// insert will never actually occur because our tx will be rolled back,
|
||||||
|
// however, it will ensure our tx grabs the write lock. Unfortunately,
|
||||||
|
// we can't call "BEGIN IMMEDIATE" as we are already in a transaction.
|
||||||
|
if _, err := tx.ExecContext(db.ctx, `INSERT INTO _litestream_lock (id) VALUES (1);`); err != nil {
|
||||||
|
return fmt.Errorf("_litestream_lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Copy the end of the previous WAL before starting a new shadow WAL.
|
// Copy the end of the previous WAL before starting a new shadow WAL.
|
||||||
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
||||||
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
|
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
|
||||||
@@ -1370,6 +1358,10 @@ func (db *DB) checkpointAndInit(generation, mode string) error {
|
|||||||
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Release write lock before checkpointing & exiting.
|
||||||
|
if err := tx.Rollback(); err != nil {
|
||||||
|
return fmt.Errorf("rollback post-checkpoint tx: %w", err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user