From f4819efbebcacc85a81e7f6b819b429ff0d3c9bd Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 20 Dec 2020 08:01:47 -0700 Subject: [PATCH] Add checkpointing. --- db.go | 146 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 119 insertions(+), 27 deletions(-) diff --git a/db.go b/db.go index e533e20..ad9b3eb 100644 --- a/db.go +++ b/db.go @@ -28,21 +28,37 @@ const ( GenerationNameLen = 16 ) +// Default DB settings. const ( - DefaultMonitorInterval = 1 * time.Second + DefaultMonitorInterval = 1 * time.Second + DefaultMinCheckpointPageN = 1000 ) // DB represents a managed instance of a SQLite database in the file system. type DB struct { - mu sync.Mutex - path string // part to database - db *sql.DB // target database - rtx *sql.Tx // long running read transaction + mu sync.Mutex + path string // part to database + db *sql.DB // target database + rtx *sql.Tx // long running read transaction + pageSize int // page size, in bytes ctx context.Context cancel func() wg sync.WaitGroup + // Minimum threshold of WAL size, in pages, before a passive checkpoint. + // A passive checkpoint will attempt a checkpoint but fail if there are + // active transactions occurring at the same time. + MinCheckpointPageN int + + // Maximum threshold of WAL size, in pages, before a forced checkpoint. + // A forced checkpoint will block new transactions and wait for existing + // transactions to finish before issuing a checkpoint and resetting the WAL. + // + // If zero, no checkpoints are forced. This can cause the WAL to grow + // unbounded if there are always read transactions occurring. + MaxCheckpointPageN int + // List of replicators for the database. // Must be set before calling Open(). Replicators []Replicator @@ -54,8 +70,9 @@ type DB struct { // NewDB returns a new instance of DB for a given path. func NewDB(path string) *DB { db := &DB{ - path: path, - MonitorInterval: DefaultMonitorInterval, + path: path, + MinCheckpointPageN: DefaultMinCheckpointPageN, + MonitorInterval: DefaultMonitorInterval, } db.ctx, db.cancel = context.WithCancel(context.Background()) return db @@ -142,6 +159,13 @@ func (db *DB) Open() (err error) { return fmt.Errorf("acquire read lock: %w", err) } + // Read page size. + if err := db.db.QueryRow(`PRAGMA page_size;`).Scan(&db.pageSize); err != nil { + return fmt.Errorf("read page size: %w", err) + } else if db.pageSize <= 0 { + return fmt.Errorf("invalid db page size: %d", db.pageSize) + } + // Ensure meta directory structure exists. if err := os.MkdirAll(db.MetaPath(), 0700); err != nil { return err @@ -320,7 +344,7 @@ func (db *DB) Sync() (err error) { // Ensure write transaction rolls back before returning. defer func() { - if e := tx.Rollback(); e != nil && err == nil { + if e := rollback(tx); e != nil && err == nil { err = e } }() @@ -349,32 +373,79 @@ func (db *DB) Sync() (err error) { } // Synchronize real WAL with current shadow WAL. - if err := db.syncWAL(generation); err != nil { + newWALSize, err := db.syncWAL(generation) + if err != nil { return fmt.Errorf("sync wal: %w", err) } - // TODO: If WAL size is greater than min threshold, attempt checkpoint: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint - // TODO: If WAL size is great than max threshold, force checkpoint. - // TODO: Release write lock on database. + // If WAL size is great than max threshold, force checkpoint. + // If WAL size is greater than min threshold, attempt checkpoint. + var checkpoint, forceCheckpoint bool + if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { + checkpoint, forceCheckpoint = true, false + } else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { + checkpoint, forceCheckpoint = true, true + } - // TODO: On checkpoint, write new page and start new shadow WAL. + // Release write lock before checkpointing & exiting. + if err := tx.Rollback(); err != nil { + return fmt.Errorf("rollback write tx: %w", err) + } + + // Issue the checkpoint. + if checkpoint { + if err := db.checkpoint(forceCheckpoint); err != nil { + return fmt.Errorf("checkpoint: force=%v err=%w", err) + } + } return nil } +// checkpoint performs a checkpoint on the WAL file. +func (db *DB) checkpoint(force bool) error { + // Ensure the read lock has been removed before issuing a checkpoint. + // We defer the re-acquire to ensure it occurs even on an early return. + if err := db.releaseReadLock(); err != nil { + return fmt.Errorf("release read lock: %w", err) + } + defer db.acquireReadLock() + + // A non-forced checkpoint is issued as "PASSIVE". This will only checkpoint + // if there are not pending transactions. A forced checkpoint ("RESTART") + // will wait for pending transactions to end & block new transactions before + // forcing the checkpoint and restarting the WAL. + // + // See: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint + rawsql := `PRAGMA wal_checkpoint;` + if force { + rawsql = `PRAGMA wal_checkpoint(RESTART);` + } + + if _, err := db.db.Exec(rawsql); err != nil { + return err + } + + // Reacquire the read lock immediately after the checkpoint. + if err := db.acquireReadLock(); err != nil { + return fmt.Errorf("release read lock: %w", err) + } + return nil +} + // syncWAL copies pending bytes from the real WAL to the shadow WAL. -func (db *DB) syncWAL(generation string) error { +func (db *DB) syncWAL(generation string) (newSize int64, err error) { // Determine total bytes of real WAL. fi, err := os.Stat(db.WALPath()) if err != nil { - return err + return 0, err } walSize := fi.Size() // Open shadow WAL to copy append to. shadowWALPath, err := db.CurrentShadowWALPath(generation) if err != nil { - return fmt.Errorf("cannot determine shadow WAL: %w", err) + return 0, fmt.Errorf("cannot determine shadow WAL: %w", err) } // TODO: Compare WAL headers. @@ -382,7 +453,7 @@ func (db *DB) syncWAL(generation string) error { // Determine shadow WAL current size. fi, err = os.Stat(shadowWALPath) if err != nil { - return err + return 0, err } shadowWALSize := fi.Size() @@ -392,7 +463,7 @@ func (db *DB) syncWAL(generation string) error { if pendingN < 0 { panic("shadow wal larger than real wal") // TODO: Handle gracefully } else if pendingN == 0 { - return nil // wals match, exit + return shadowWALSize, nil // wals match, exit } // TODO: Verify last page copied matches. @@ -400,32 +471,32 @@ func (db *DB) syncWAL(generation string) error { // Open handles for the shadow WAL & real WAL. w, err := os.OpenFile(shadowWALPath, os.O_RDWR, 0600) if err != nil { - return err + return 0, err } defer w.Close() r, err := os.Open(db.WALPath()) if err != nil { - return err + return 0, err } defer r.Close() // Seek to the correct position for each file. if _, err := r.Seek(shadowWALSize, io.SeekStart); err != nil { - return fmt.Errorf("wal seek: %w", err) + return 0, fmt.Errorf("wal seek: %w", err) } else if _, err := w.Seek(shadowWALSize, io.SeekStart); err != nil { - return fmt.Errorf("shadow wal seek: %w", err) + return 0, fmt.Errorf("shadow wal seek: %w", err) } // Copy and sync. if _, err := io.CopyN(w, r, pendingN); err != nil { - return fmt.Errorf("copy shadow wal error: %w", err) + return 0, fmt.Errorf("copy shadow wal error: %w", err) } else if err := w.Sync(); err != nil { - return fmt.Errorf("shadow wal sync: %w", err) + return 0, fmt.Errorf("shadow wal sync: %w", err) } else if err := w.Close(); err != nil { - return fmt.Errorf("shadow wal close: %w", err) + return 0, fmt.Errorf("shadow wal close: %w", err) } - return nil + return walSize, nil } // monitor runs in a separate goroutine and monitors the database & WAL. @@ -442,7 +513,7 @@ func (db *DB) monitor() { // Sync the database to the shadow WAL. if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) { - log.Printf("sync error: path=%s err=%s", db.path, err) + log.Printf("%s: sync error: %s", db.path, err) } // If context closed, exit after final sync. @@ -451,3 +522,24 @@ func (db *DB) monitor() { } } } + +const ( + // WALHeaderSize is the size of the WAL header, in bytes. + WALHeaderSize = 32 + + // WALFrameHeaderSize is the size of the WAL frame header, in bytes. + WALFrameHeaderSize = 24 +) + +// calcWALSize returns the size of the WAL, in bytes, for a given number of pages. +func calcWALSize(pageSize int, n int) int64 { + return int64(WALHeaderSize + ((WALFrameHeaderSize + pageSize) * n)) +} + +// rollback rolls back tx. Ignores already-rolled-back errors. +func rollback(tx *sql.Tx) error { + if err := tx.Rollback(); err != nil && !strings.Contains(err.Error(), `transaction has already been committed or rolled back`) { + return err + } + return nil +}