Prevent checkpoints during snapshots (#477)
This commit is contained in:
20
db.go
20
db.go
@@ -50,6 +50,7 @@ type DB struct {
|
|||||||
rtx *sql.Tx // long running read transaction
|
rtx *sql.Tx // long running read transaction
|
||||||
pageSize int // page size, in bytes
|
pageSize int // page size, in bytes
|
||||||
notify chan struct{} // closes on WAL change
|
notify chan struct{} // closes on WAL change
|
||||||
|
chkMu sync.Mutex // checkpoint lock
|
||||||
|
|
||||||
fileInfo os.FileInfo // db info cached during init
|
fileInfo os.FileInfo // db info cached during init
|
||||||
dirInfo os.FileInfo // parent dir info cached during init
|
dirInfo os.FileInfo // parent dir info cached during init
|
||||||
@@ -1247,6 +1248,12 @@ func (db *DB) Checkpoint(ctx context.Context, mode string) (err error) {
|
|||||||
// checkpointAndInit performs a checkpoint on the WAL file and initializes a
|
// checkpointAndInit performs a checkpoint on the WAL file and initializes a
|
||||||
// new shadow WAL file.
|
// new shadow WAL file.
|
||||||
func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
||||||
|
// Try getting a checkpoint lock, will fail during snapshots.
|
||||||
|
if !db.chkMu.TryLock() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer db.chkMu.Unlock()
|
||||||
|
|
||||||
shadowWALPath, err := db.CurrentShadowWALPath(generation)
|
shadowWALPath, err := db.CurrentShadowWALPath(generation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -1482,6 +1489,19 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
|
|||||||
return h.Sum64(), pos, nil
|
return h.Sum64(), pos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BeginSnapshot takes an internal snapshot lock preventing checkpoints.
|
||||||
|
//
|
||||||
|
// When calling this the caller must also call EndSnapshot() once the snapshot
|
||||||
|
// is finished.
|
||||||
|
func (db *DB) BeginSnapshot() {
|
||||||
|
db.chkMu.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// EndSnapshot releases the internal snapshot lock that prevents checkpoints.
|
||||||
|
func (db *DB) EndSnapshot() {
|
||||||
|
db.chkMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
|
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
|
||||||
const DefaultRestoreParallelism = 8
|
const DefaultRestoreParallelism = 8
|
||||||
|
|
||||||
|
|||||||
@@ -463,6 +463,10 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
|||||||
r.muf.Lock()
|
r.muf.Lock()
|
||||||
defer r.muf.Unlock()
|
defer r.muf.Unlock()
|
||||||
|
|
||||||
|
// Prevent checkpoints during snapshot.
|
||||||
|
r.db.BeginSnapshot()
|
||||||
|
defer r.db.EndSnapshot()
|
||||||
|
|
||||||
// Issue a passive checkpoint to flush any pages to disk before snapshotting.
|
// Issue a passive checkpoint to flush any pages to disk before snapshotting.
|
||||||
if _, err := r.db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(PASSIVE);`); err != nil {
|
if _, err := r.db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(PASSIVE);`); err != nil {
|
||||||
return info, fmt.Errorf("pre-snapshot checkpoint: %w", err)
|
return info, fmt.Errorf("pre-snapshot checkpoint: %w", err)
|
||||||
|
|||||||
Reference in New Issue
Block a user