Force truncation checkpoint if WAL becomes runaway (#473)
This commit is contained in:
86
db.go
86
db.go
@@ -31,6 +31,7 @@ const (
|
|||||||
DefaultCheckpointInterval = 1 * time.Minute
|
DefaultCheckpointInterval = 1 * time.Minute
|
||||||
DefaultMinCheckpointPageN = 1000
|
DefaultMinCheckpointPageN = 1000
|
||||||
DefaultMaxCheckpointPageN = 10000
|
DefaultMaxCheckpointPageN = 10000
|
||||||
|
DefaultTruncatePageN = 500000
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxIndex is the maximum possible WAL index.
|
// MaxIndex is the maximum possible WAL index.
|
||||||
@@ -85,6 +86,16 @@ type DB struct {
|
|||||||
// unbounded if there are always read transactions occurring.
|
// unbounded if there are always read transactions occurring.
|
||||||
MaxCheckpointPageN int
|
MaxCheckpointPageN int
|
||||||
|
|
||||||
|
// Threshold of WAL size, in pages, before a forced truncation checkpoint.
|
||||||
|
// A forced truncation checkpoint will block new transactions and wait for
|
||||||
|
// existing transactions to finish before issuing a checkpoint and
|
||||||
|
// truncating the WAL.
|
||||||
|
//
|
||||||
|
// If zero, no truncates are forced. This can cause the WAL to grow
|
||||||
|
// unbounded if there's a sudden spike of changes between other
|
||||||
|
// checkpoints.
|
||||||
|
TruncatePageN int
|
||||||
|
|
||||||
// Time between automatic checkpoints in the WAL. This is done to allow
|
// Time between automatic checkpoints in the WAL. This is done to allow
|
||||||
// more fine-grained WAL files so that restores can be performed with
|
// more fine-grained WAL files so that restores can be performed with
|
||||||
// better precision.
|
// better precision.
|
||||||
@@ -112,6 +123,7 @@ func NewDB(path string) *DB {
|
|||||||
|
|
||||||
MinCheckpointPageN: DefaultMinCheckpointPageN,
|
MinCheckpointPageN: DefaultMinCheckpointPageN,
|
||||||
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
||||||
|
TruncatePageN: DefaultTruncatePageN,
|
||||||
CheckpointInterval: DefaultCheckpointInterval,
|
CheckpointInterval: DefaultCheckpointInterval,
|
||||||
MonitorInterval: DefaultMonitorInterval,
|
MonitorInterval: DefaultMonitorInterval,
|
||||||
Logger: slog.With("db", path),
|
Logger: slog.With("db", path),
|
||||||
@@ -750,7 +762,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Synchronize real WAL with current shadow WAL.
|
// Synchronize real WAL with current shadow WAL.
|
||||||
newWALSize, err := db.syncWAL(info)
|
origWALSize, newWALSize, err := db.syncWAL(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sync wal: %w", err)
|
return fmt.Errorf("sync wal: %w", err)
|
||||||
}
|
}
|
||||||
@@ -759,7 +771,9 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
// If WAL size is greater than min threshold, attempt checkpoint.
|
// If WAL size is greater than min threshold, attempt checkpoint.
|
||||||
var checkpoint bool
|
var checkpoint bool
|
||||||
checkpointMode := CheckpointModePassive
|
checkpointMode := CheckpointModePassive
|
||||||
if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
|
if db.TruncatePageN > 0 && origWALSize >= calcWALSize(db.pageSize, db.TruncatePageN) {
|
||||||
|
checkpoint, checkpointMode = true, CheckpointModeTruncate
|
||||||
|
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
|
||||||
checkpoint, checkpointMode = true, CheckpointModeRestart
|
checkpoint, checkpointMode = true, CheckpointModeRestart
|
||||||
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
|
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
|
||||||
checkpoint = true
|
checkpoint = true
|
||||||
@@ -918,29 +932,29 @@ type syncInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
|
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
|
||||||
func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
|
func (db *DB) syncWAL(info syncInfo) (origSize int64, newSize int64, err error) {
|
||||||
// Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed.
|
// Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed.
|
||||||
newSize, err = db.copyToShadowWAL(info.shadowWALPath)
|
origSize, newSize, err = db.copyToShadowWAL(info.shadowWALPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return newSize, fmt.Errorf("cannot copy to shadow wal: %w", err)
|
return origSize, newSize, fmt.Errorf("cannot copy to shadow wal: %w", err)
|
||||||
} else if !info.restart {
|
} else if !info.restart {
|
||||||
return newSize, nil // If no restart required, exit.
|
return origSize, newSize, nil // If no restart required, exit.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse index of current shadow WAL file.
|
// Parse index of current shadow WAL file.
|
||||||
dir, base := filepath.Split(info.shadowWALPath)
|
dir, base := filepath.Split(info.shadowWALPath)
|
||||||
index, err := ParseWALPath(base)
|
index, err := ParseWALPath(base)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
return 0, 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a new shadow WAL file with next index.
|
// Start a new shadow WAL file with next index.
|
||||||
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
|
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
|
||||||
newSize, err = db.initShadowWALFile(newShadowWALPath)
|
newSize, err = db.initShadowWALFile(newShadowWALPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
return 0, 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
||||||
}
|
}
|
||||||
return newSize, nil
|
return origSize, newSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
||||||
@@ -976,52 +990,58 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
|||||||
_ = os.Chown(filename, uid, gid)
|
_ = os.Chown(filename, uid, gid)
|
||||||
|
|
||||||
// Copy as much shadow WAL as available.
|
// Copy as much shadow WAL as available.
|
||||||
newSize, err := db.copyToShadowWAL(filename)
|
_, newSize, err := db.copyToShadowWAL(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err)
|
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err)
|
||||||
}
|
}
|
||||||
return newSize, nil
|
return newSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64, err error) {
|
||||||
logger := db.Logger.With("filename", filename)
|
logger := db.Logger.With("filename", filename)
|
||||||
logger.Debug("copy-shadow")
|
logger.Debug("copy-shadow")
|
||||||
|
|
||||||
r, err := os.Open(db.WALPath())
|
r, err := os.Open(db.WALPath())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
|
fi, err := r.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
origWalSize = frameAlign(fi.Size(), db.pageSize)
|
||||||
|
|
||||||
w, err := os.OpenFile(filename, os.O_RDWR, 0666)
|
w, err := os.OpenFile(filename, os.O_RDWR, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
fi, err := w.Stat()
|
fi, err = w.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
origSize := frameAlign(fi.Size(), db.pageSize)
|
origSize := frameAlign(fi.Size(), db.pageSize)
|
||||||
|
|
||||||
// Read shadow WAL header to determine byte order for checksum & salt.
|
// Read shadow WAL header to determine byte order for checksum & salt.
|
||||||
hdr := make([]byte, WALHeaderSize)
|
hdr := make([]byte, WALHeaderSize)
|
||||||
if _, err := io.ReadFull(w, hdr); err != nil {
|
if _, err := io.ReadFull(w, hdr); err != nil {
|
||||||
return 0, fmt.Errorf("read header: %w", err)
|
return 0, 0, fmt.Errorf("read header: %w", err)
|
||||||
}
|
}
|
||||||
hsalt0 := binary.BigEndian.Uint32(hdr[16:])
|
hsalt0 := binary.BigEndian.Uint32(hdr[16:])
|
||||||
hsalt1 := binary.BigEndian.Uint32(hdr[20:])
|
hsalt1 := binary.BigEndian.Uint32(hdr[20:])
|
||||||
|
|
||||||
bo, err := headerByteOrder(hdr)
|
bo, err := headerByteOrder(hdr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read previous checksum.
|
// Read previous checksum.
|
||||||
chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize)
|
chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("last checksum: %w", err)
|
return 0, 0, fmt.Errorf("last checksum: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write to a temporary shadow file.
|
// Write to a temporary shadow file.
|
||||||
@@ -1030,15 +1050,15 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|||||||
|
|
||||||
f, err := internal.CreateFile(tempFilename, db.fileInfo)
|
f, err := internal.CreateFile(tempFilename, db.fileInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("create temp file: %w", err)
|
return 0, 0, fmt.Errorf("create temp file: %w", err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
// Seek to correct position on real wal.
|
// Seek to correct position on real wal.
|
||||||
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||||
return 0, fmt.Errorf("real wal seek: %w", err)
|
return 0, 0, fmt.Errorf("real wal seek: %w", err)
|
||||||
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
||||||
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
return 0, 0, fmt.Errorf("shadow wal seek: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read through WAL from last position to find the page of the last
|
// Read through WAL from last position to find the page of the last
|
||||||
@@ -1052,7 +1072,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|||||||
logger.Debug("copy-shadow: break", "offset", offset, "error", err)
|
logger.Debug("copy-shadow: break", "offset", offset, "error", err)
|
||||||
break // end of file or partial page
|
break // end of file or partial page
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return 0, fmt.Errorf("read wal: %w", err)
|
return 0, 0, fmt.Errorf("read wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
||||||
@@ -1075,7 +1095,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|||||||
|
|
||||||
// Write page to temporary WAL file.
|
// Write page to temporary WAL file.
|
||||||
if _, err := f.Write(frame); err != nil {
|
if _, err := f.Write(frame); err != nil {
|
||||||
return 0, fmt.Errorf("write temp shadow wal: %w", err)
|
return 0, 0, fmt.Errorf("write temp shadow wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debug("copy-shadow: ok", "offset", offset, "salt", fmt.Sprintf("%x %x", salt0, salt1))
|
logger.Debug("copy-shadow: ok", "offset", offset, "salt", fmt.Sprintf("%x %x", salt0, salt1))
|
||||||
@@ -1090,39 +1110,39 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|||||||
|
|
||||||
// If no WAL writes found, exit.
|
// If no WAL writes found, exit.
|
||||||
if origSize == lastCommitSize {
|
if origSize == lastCommitSize {
|
||||||
return origSize, nil
|
return origSize, lastCommitSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
walByteN := lastCommitSize - origSize
|
walByteN := lastCommitSize - origSize
|
||||||
|
|
||||||
// Move to beginning of temporary file.
|
// Move to beginning of temporary file.
|
||||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||||
return 0, fmt.Errorf("temp file seek: %w", err)
|
return 0, 0, fmt.Errorf("temp file seek: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy from temporary file to shadow WAL.
|
// Copy from temporary file to shadow WAL.
|
||||||
if _, err := io.Copy(w, &io.LimitedReader{R: f, N: walByteN}); err != nil {
|
if _, err := io.Copy(w, &io.LimitedReader{R: f, N: walByteN}); err != nil {
|
||||||
return 0, fmt.Errorf("write shadow file: %w", err)
|
return 0, 0, fmt.Errorf("write shadow file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close & remove temporary file.
|
// Close & remove temporary file.
|
||||||
if err := f.Close(); err != nil {
|
if err := f.Close(); err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
} else if err := os.Remove(tempFilename); err != nil {
|
} else if err := os.Remove(tempFilename); err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync & close shadow WAL.
|
// Sync & close shadow WAL.
|
||||||
if err := w.Sync(); err != nil {
|
if err := w.Sync(); err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
} else if err := w.Close(); err != nil {
|
} else if err := w.Close(); err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track total number of bytes written to WAL.
|
// Track total number of bytes written to WAL.
|
||||||
db.totalWALBytesCounter.Add(float64(walByteN))
|
db.totalWALBytesCounter.Add(float64(walByteN))
|
||||||
|
|
||||||
return lastCommitSize, nil
|
return origWalSize, lastCommitSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
|
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
|
||||||
@@ -1297,7 +1317,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy shadow WAL before checkpoint to copy as much as possible.
|
// Copy shadow WAL before checkpoint to copy as much as possible.
|
||||||
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
||||||
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
|
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1332,7 +1352,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy the end of the previous WAL before starting a new shadow WAL.
|
// Copy the end of the previous WAL before starting a new shadow WAL.
|
||||||
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
||||||
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
|
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user