From fe9ab5c51703508c32b9ffd86bce49477526116f Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Fri, 20 Oct 2023 03:27:51 +0300 Subject: [PATCH] Force truncation checkpoint if WAL becomes runaway (#473) --- db.go | 86 ++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 33 deletions(-) diff --git a/db.go b/db.go index 6aef612..ba914e1 100644 --- a/db.go +++ b/db.go @@ -31,6 +31,7 @@ const ( DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 + DefaultTruncatePageN = 500000 ) // MaxIndex is the maximum possible WAL index. @@ -85,6 +86,16 @@ type DB struct { // unbounded if there are always read transactions occurring. MaxCheckpointPageN int + // Threshold of WAL size, in pages, before a forced truncation checkpoint. + // A forced truncation checkpoint will block new transactions and wait for + // existing transactions to finish before issuing a checkpoint and + // truncating the WAL. + // + // If zero, no truncates are forced. This can cause the WAL to grow + // unbounded if there's a sudden spike of changes between other + // checkpoints. + TruncatePageN int + // Time between automatic checkpoints in the WAL. This is done to allow // more fine-grained WAL files so that restores can be performed with // better precision. @@ -112,6 +123,7 @@ func NewDB(path string) *DB { MinCheckpointPageN: DefaultMinCheckpointPageN, MaxCheckpointPageN: DefaultMaxCheckpointPageN, + TruncatePageN: DefaultTruncatePageN, CheckpointInterval: DefaultCheckpointInterval, MonitorInterval: DefaultMonitorInterval, Logger: slog.With("db", path), @@ -750,7 +762,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { } // Synchronize real WAL with current shadow WAL. - newWALSize, err := db.syncWAL(info) + origWALSize, newWALSize, err := db.syncWAL(info) if err != nil { return fmt.Errorf("sync wal: %w", err) } @@ -759,7 +771,9 @@ func (db *DB) Sync(ctx context.Context) (err error) { // If WAL size is greater than min threshold, attempt checkpoint. var checkpoint bool checkpointMode := CheckpointModePassive - if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { + if db.TruncatePageN > 0 && origWALSize >= calcWALSize(db.pageSize, db.TruncatePageN) { + checkpoint, checkpointMode = true, CheckpointModeTruncate + } else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { checkpoint, checkpointMode = true, CheckpointModeRestart } else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { checkpoint = true @@ -918,29 +932,29 @@ type syncInfo struct { } // syncWAL copies pending bytes from the real WAL to the shadow WAL. -func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) { +func (db *DB) syncWAL(info syncInfo) (origSize int64, newSize int64, err error) { // Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed. - newSize, err = db.copyToShadowWAL(info.shadowWALPath) + origSize, newSize, err = db.copyToShadowWAL(info.shadowWALPath) if err != nil { - return newSize, fmt.Errorf("cannot copy to shadow wal: %w", err) + return origSize, newSize, fmt.Errorf("cannot copy to shadow wal: %w", err) } else if !info.restart { - return newSize, nil // If no restart required, exit. + return origSize, newSize, nil // If no restart required, exit. } // Parse index of current shadow WAL file. dir, base := filepath.Split(info.shadowWALPath) index, err := ParseWALPath(base) if err != nil { - return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) + return 0, 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) } // Start a new shadow WAL file with next index. newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1)) newSize, err = db.initShadowWALFile(newShadowWALPath) if err != nil { - return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) + return 0, 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } - return newSize, nil + return origSize, newSize, nil } func (db *DB) initShadowWALFile(filename string) (int64, error) { @@ -976,52 +990,58 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) { _ = os.Chown(filename, uid, gid) // Copy as much shadow WAL as available. - newSize, err := db.copyToShadowWAL(filename) + _, newSize, err := db.copyToShadowWAL(filename) if err != nil { return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err) } return newSize, nil } -func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { +func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64, err error) { logger := db.Logger.With("filename", filename) logger.Debug("copy-shadow") r, err := os.Open(db.WALPath()) if err != nil { - return 0, err + return 0, 0, err } defer r.Close() + fi, err := r.Stat() + if err != nil { + return 0, 0, err + } + origWalSize = frameAlign(fi.Size(), db.pageSize) + w, err := os.OpenFile(filename, os.O_RDWR, 0666) if err != nil { - return 0, err + return 0, 0, err } defer w.Close() - fi, err := w.Stat() + fi, err = w.Stat() if err != nil { - return 0, err + return 0, 0, err } origSize := frameAlign(fi.Size(), db.pageSize) // Read shadow WAL header to determine byte order for checksum & salt. hdr := make([]byte, WALHeaderSize) if _, err := io.ReadFull(w, hdr); err != nil { - return 0, fmt.Errorf("read header: %w", err) + return 0, 0, fmt.Errorf("read header: %w", err) } hsalt0 := binary.BigEndian.Uint32(hdr[16:]) hsalt1 := binary.BigEndian.Uint32(hdr[20:]) bo, err := headerByteOrder(hdr) if err != nil { - return 0, err + return 0, 0, err } // Read previous checksum. chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize) if err != nil { - return 0, fmt.Errorf("last checksum: %w", err) + return 0, 0, fmt.Errorf("last checksum: %w", err) } // Write to a temporary shadow file. @@ -1030,15 +1050,15 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { f, err := internal.CreateFile(tempFilename, db.fileInfo) if err != nil { - return 0, fmt.Errorf("create temp file: %w", err) + return 0, 0, fmt.Errorf("create temp file: %w", err) } defer f.Close() // Seek to correct position on real wal. if _, err := r.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("real wal seek: %w", err) + return 0, 0, fmt.Errorf("real wal seek: %w", err) } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("shadow wal seek: %w", err) + return 0, 0, fmt.Errorf("shadow wal seek: %w", err) } // Read through WAL from last position to find the page of the last @@ -1052,7 +1072,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { logger.Debug("copy-shadow: break", "offset", offset, "error", err) break // end of file or partial page } else if err != nil { - return 0, fmt.Errorf("read wal: %w", err) + return 0, 0, fmt.Errorf("read wal: %w", err) } // Read frame salt & compare to header salt. Stop reading on mismatch. @@ -1075,7 +1095,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { // Write page to temporary WAL file. if _, err := f.Write(frame); err != nil { - return 0, fmt.Errorf("write temp shadow wal: %w", err) + return 0, 0, fmt.Errorf("write temp shadow wal: %w", err) } logger.Debug("copy-shadow: ok", "offset", offset, "salt", fmt.Sprintf("%x %x", salt0, salt1)) @@ -1090,39 +1110,39 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { // If no WAL writes found, exit. if origSize == lastCommitSize { - return origSize, nil + return origSize, lastCommitSize, nil } walByteN := lastCommitSize - origSize // Move to beginning of temporary file. if _, err := f.Seek(0, io.SeekStart); err != nil { - return 0, fmt.Errorf("temp file seek: %w", err) + return 0, 0, fmt.Errorf("temp file seek: %w", err) } // Copy from temporary file to shadow WAL. if _, err := io.Copy(w, &io.LimitedReader{R: f, N: walByteN}); err != nil { - return 0, fmt.Errorf("write shadow file: %w", err) + return 0, 0, fmt.Errorf("write shadow file: %w", err) } // Close & remove temporary file. if err := f.Close(); err != nil { - return 0, err + return 0, 0, err } else if err := os.Remove(tempFilename); err != nil { - return 0, err + return 0, 0, err } // Sync & close shadow WAL. if err := w.Sync(); err != nil { - return 0, err + return 0, 0, err } else if err := w.Close(); err != nil { - return 0, err + return 0, 0, err } // Track total number of bytes written to WAL. db.totalWALBytesCounter.Add(float64(walByteN)) - return lastCommitSize, nil + return origWalSize, lastCommitSize, nil } // ShadowWALReader opens a reader for a shadow WAL file at a given position. @@ -1297,7 +1317,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { } // Copy shadow WAL before checkpoint to copy as much as possible. - if _, err := db.copyToShadowWAL(shadowWALPath); err != nil { + if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil { return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err) } @@ -1332,7 +1352,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { } // Copy the end of the previous WAL before starting a new shadow WAL. - if _, err := db.copyToShadowWAL(shadowWALPath); err != nil { + if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil { return fmt.Errorf("cannot copy to end of shadow wal: %w", err) }