Add checkpointing.

This commit is contained in:
Ben Johnson
2020-12-20 08:01:47 -07:00
parent a8387ed6f9
commit f4819efbeb

132
db.go
View File

@@ -28,8 +28,10 @@ const (
GenerationNameLen = 16
)
// Default DB settings.
const (
DefaultMonitorInterval = 1 * time.Second
DefaultMinCheckpointPageN = 1000
)
// DB represents a managed instance of a SQLite database in the file system.
@@ -38,11 +40,25 @@ type DB struct {
path string // part to database
db *sql.DB // target database
rtx *sql.Tx // long running read transaction
pageSize int // page size, in bytes
ctx context.Context
cancel func()
wg sync.WaitGroup
// Minimum threshold of WAL size, in pages, before a passive checkpoint.
// A passive checkpoint will attempt a checkpoint but fail if there are
// active transactions occurring at the same time.
MinCheckpointPageN int
// Maximum threshold of WAL size, in pages, before a forced checkpoint.
// A forced checkpoint will block new transactions and wait for existing
// transactions to finish before issuing a checkpoint and resetting the WAL.
//
// If zero, no checkpoints are forced. This can cause the WAL to grow
// unbounded if there are always read transactions occurring.
MaxCheckpointPageN int
// List of replicators for the database.
// Must be set before calling Open().
Replicators []Replicator
@@ -55,6 +71,7 @@ type DB struct {
func NewDB(path string) *DB {
db := &DB{
path: path,
MinCheckpointPageN: DefaultMinCheckpointPageN,
MonitorInterval: DefaultMonitorInterval,
}
db.ctx, db.cancel = context.WithCancel(context.Background())
@@ -142,6 +159,13 @@ func (db *DB) Open() (err error) {
return fmt.Errorf("acquire read lock: %w", err)
}
// Read page size.
if err := db.db.QueryRow(`PRAGMA page_size;`).Scan(&db.pageSize); err != nil {
return fmt.Errorf("read page size: %w", err)
} else if db.pageSize <= 0 {
return fmt.Errorf("invalid db page size: %d", db.pageSize)
}
// Ensure meta directory structure exists.
if err := os.MkdirAll(db.MetaPath(), 0700); err != nil {
return err
@@ -320,7 +344,7 @@ func (db *DB) Sync() (err error) {
// Ensure write transaction rolls back before returning.
defer func() {
if e := tx.Rollback(); e != nil && err == nil {
if e := rollback(tx); e != nil && err == nil {
err = e
}
}()
@@ -349,32 +373,79 @@ func (db *DB) Sync() (err error) {
}
// Synchronize real WAL with current shadow WAL.
if err := db.syncWAL(generation); err != nil {
newWALSize, err := db.syncWAL(generation)
if err != nil {
return fmt.Errorf("sync wal: %w", err)
}
// TODO: If WAL size is greater than min threshold, attempt checkpoint: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
// TODO: If WAL size is great than max threshold, force checkpoint.
// TODO: Release write lock on database.
// If WAL size is great than max threshold, force checkpoint.
// If WAL size is greater than min threshold, attempt checkpoint.
var checkpoint, forceCheckpoint bool
if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
checkpoint, forceCheckpoint = true, false
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
checkpoint, forceCheckpoint = true, true
}
// TODO: On checkpoint, write new page and start new shadow WAL.
// Release write lock before checkpointing & exiting.
if err := tx.Rollback(); err != nil {
return fmt.Errorf("rollback write tx: %w", err)
}
// Issue the checkpoint.
if checkpoint {
if err := db.checkpoint(forceCheckpoint); err != nil {
return fmt.Errorf("checkpoint: force=%v err=%w", err)
}
}
return nil
}
// checkpoint performs a checkpoint on the WAL file.
func (db *DB) checkpoint(force bool) error {
// Ensure the read lock has been removed before issuing a checkpoint.
// We defer the re-acquire to ensure it occurs even on an early return.
if err := db.releaseReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err)
}
defer db.acquireReadLock()
// A non-forced checkpoint is issued as "PASSIVE". This will only checkpoint
// if there are not pending transactions. A forced checkpoint ("RESTART")
// will wait for pending transactions to end & block new transactions before
// forcing the checkpoint and restarting the WAL.
//
// See: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
rawsql := `PRAGMA wal_checkpoint;`
if force {
rawsql = `PRAGMA wal_checkpoint(RESTART);`
}
if _, err := db.db.Exec(rawsql); err != nil {
return err
}
// Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err)
}
return nil
}
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
func (db *DB) syncWAL(generation string) error {
func (db *DB) syncWAL(generation string) (newSize int64, err error) {
// Determine total bytes of real WAL.
fi, err := os.Stat(db.WALPath())
if err != nil {
return err
return 0, err
}
walSize := fi.Size()
// Open shadow WAL to copy append to.
shadowWALPath, err := db.CurrentShadowWALPath(generation)
if err != nil {
return fmt.Errorf("cannot determine shadow WAL: %w", err)
return 0, fmt.Errorf("cannot determine shadow WAL: %w", err)
}
// TODO: Compare WAL headers.
@@ -382,7 +453,7 @@ func (db *DB) syncWAL(generation string) error {
// Determine shadow WAL current size.
fi, err = os.Stat(shadowWALPath)
if err != nil {
return err
return 0, err
}
shadowWALSize := fi.Size()
@@ -392,7 +463,7 @@ func (db *DB) syncWAL(generation string) error {
if pendingN < 0 {
panic("shadow wal larger than real wal") // TODO: Handle gracefully
} else if pendingN == 0 {
return nil // wals match, exit
return shadowWALSize, nil // wals match, exit
}
// TODO: Verify last page copied matches.
@@ -400,32 +471,32 @@ func (db *DB) syncWAL(generation string) error {
// Open handles for the shadow WAL & real WAL.
w, err := os.OpenFile(shadowWALPath, os.O_RDWR, 0600)
if err != nil {
return err
return 0, err
}
defer w.Close()
r, err := os.Open(db.WALPath())
if err != nil {
return err
return 0, err
}
defer r.Close()
// Seek to the correct position for each file.
if _, err := r.Seek(shadowWALSize, io.SeekStart); err != nil {
return fmt.Errorf("wal seek: %w", err)
return 0, fmt.Errorf("wal seek: %w", err)
} else if _, err := w.Seek(shadowWALSize, io.SeekStart); err != nil {
return fmt.Errorf("shadow wal seek: %w", err)
return 0, fmt.Errorf("shadow wal seek: %w", err)
}
// Copy and sync.
if _, err := io.CopyN(w, r, pendingN); err != nil {
return fmt.Errorf("copy shadow wal error: %w", err)
return 0, fmt.Errorf("copy shadow wal error: %w", err)
} else if err := w.Sync(); err != nil {
return fmt.Errorf("shadow wal sync: %w", err)
return 0, fmt.Errorf("shadow wal sync: %w", err)
} else if err := w.Close(); err != nil {
return fmt.Errorf("shadow wal close: %w", err)
return 0, fmt.Errorf("shadow wal close: %w", err)
}
return nil
return walSize, nil
}
// monitor runs in a separate goroutine and monitors the database & WAL.
@@ -442,7 +513,7 @@ func (db *DB) monitor() {
// Sync the database to the shadow WAL.
if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) {
log.Printf("sync error: path=%s err=%s", db.path, err)
log.Printf("%s: sync error: %s", db.path, err)
}
// If context closed, exit after final sync.
@@ -451,3 +522,24 @@ func (db *DB) monitor() {
}
}
}
const (
// WALHeaderSize is the size of the WAL header, in bytes.
WALHeaderSize = 32
// WALFrameHeaderSize is the size of the WAL frame header, in bytes.
WALFrameHeaderSize = 24
)
// calcWALSize returns the size of the WAL, in bytes, for a given number of pages.
func calcWALSize(pageSize int, n int) int64 {
return int64(WALHeaderSize + ((WALFrameHeaderSize + pageSize) * n))
}
// rollback rolls back tx. Ignores already-rolled-back errors.
func rollback(tx *sql.Tx) error {
if err := tx.Rollback(); err != nil && !strings.Contains(err.Error(), `transaction has already been committed or rolled back`) {
return err
}
return nil
}