From 6f8cd5a9c4872fba47bdcd2380581f240c0505bb Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 18 Feb 2022 14:26:07 -0700 Subject: [PATCH] Configurable monitor-delay-interval The `monitor-delay-interval` has been added to the DB config so that users can change the time period between WAL checks after a file change notification has occurred. This can be useful to batch up changes in larger files in the shadow WAL or to reduce or eliminate the delay in propagating changes during read replication. Setting the interval to zero or less will disable it. --- cmd/litestream/main.go | 12 +++-- db.go | 34 ++++++++----- integration/cmd_test.go | 49 +++++++++++++++++++ .../replicate/high-load/litestream.yml | 1 - .../no-monitor-delay-interval/litestream.yml | 5 ++ .../testdata/replicate/ok/litestream.yml | 1 - .../testdata/replicate/resume/litestream.yml | 1 - 7 files changed, 84 insertions(+), 19 deletions(-) create mode 100644 integration/testdata/replicate/no-monitor-delay-interval/litestream.yml diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 490ba18..a02d862 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -266,10 +266,11 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) { // DBConfig represents the configuration for a single database. type DBConfig struct { - Path string `yaml:"path"` - CheckpointInterval *time.Duration `yaml:"checkpoint-interval"` - MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"` - MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"` + Path string `yaml:"path"` + MonitorDelayInterval *time.Duration `yaml:"monitor-delay-interval"` + CheckpointInterval *time.Duration `yaml:"checkpoint-interval"` + MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"` + MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"` Replicas []*ReplicaConfig `yaml:"replicas"` } @@ -289,6 +290,9 @@ func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error) db := litestream.NewDB(path) // Override default database settings if specified in configuration. + if dbc.MonitorDelayInterval != nil { + db.MonitorDelayInterval = *dbc.MonitorDelayInterval + } if dbc.CheckpointInterval != nil { db.CheckpointInterval = *dbc.CheckpointInterval } diff --git a/db.go b/db.go index 7821f62..ff81c4e 100644 --- a/db.go +++ b/db.go @@ -27,15 +27,13 @@ import ( // Default DB settings. const ( - DefaultCheckpointInterval = 1 * time.Minute + DefaultMonitorDelayInterval = 10 * time.Millisecond + DefaultCheckpointInterval = 1 * time.Minute + DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 ) -// MonitorDelayInterval is the time Litestream will wait after receiving a file -// change notification before processing the WAL file for changes. -const MonitorDelayInterval = 10 * time.Millisecond - // MaxIndex is the maximum possible WAL index. // If this index is reached then a new generation will be started. const MaxIndex = 0x7FFFFFFF @@ -98,6 +96,11 @@ type DB struct { // unbounded if there are always read transactions occurring. MaxCheckpointPageN int + // Time after receiving change notification before reading next WAL segment. + // Used for batching changes into fewer files instead of every transaction + // creating its own file. + MonitorDelayInterval time.Duration + // Time between automatic checkpoints in the WAL. This is done to allow // more fine-grained WAL files so that restores can be performed with // better precision. @@ -118,9 +121,10 @@ func NewDB(path string) *DB { itrs: make(map[*FileWALSegmentIterator]struct{}), - MinCheckpointPageN: DefaultMinCheckpointPageN, - MaxCheckpointPageN: DefaultMaxCheckpointPageN, - CheckpointInterval: DefaultCheckpointInterval, + MinCheckpointPageN: DefaultMinCheckpointPageN, + MaxCheckpointPageN: DefaultMaxCheckpointPageN, + MonitorDelayInterval: DefaultMonitorDelayInterval, + CheckpointInterval: DefaultCheckpointInterval, Logger: log.New(LogWriter, fmt.Sprintf("%s: ", logPrefixPath(path)), LogFlags), } @@ -1497,8 +1501,12 @@ func (db *DB) execCheckpoint(mode string) (err error) { // monitor runs in a separate goroutine and monitors the database & WAL. func (db *DB) monitor() { - timer := time.NewTimer(MonitorDelayInterval) - defer timer.Stop() + var timer *time.Timer + + if db.MonitorDelayInterval > 0 { + timer := time.NewTimer(db.MonitorDelayInterval) + defer timer.Stop() + } for { // Wait for a file change notification from the file system. @@ -1509,8 +1517,10 @@ func (db *DB) monitor() { } // Wait for small delay before processing changes. - timer.Reset(MonitorDelayInterval) - <-timer.C + if timer != nil { + timer.Reset(db.MonitorDelayInterval) + <-timer.C + } // Clear any additional change notifications that occurred during delay. select { diff --git a/integration/cmd_test.go b/integration/cmd_test.go index a663f9c..a70ad53 100644 --- a/integration/cmd_test.go +++ b/integration/cmd_test.go @@ -215,6 +215,55 @@ func TestCmd_Replicate_ResumeWithNewGeneration(t *testing.T) { restoreAndVerify(t, ctx, env, filepath.Join(testDir, "litestream.yml"), filepath.Join(tempDir, "db")) } +// Ensure the monitor interval can be turned off. +func TestCmd_Replicate_NoMonitorDelayInterval(t *testing.T) { + ctx := context.Background() + testDir, tempDir := filepath.Join("testdata", "replicate", "no-monitor-delay-interval"), t.TempDir() + env := []string{"LITESTREAM_TEMPDIR=" + tempDir} + + cmd, stdout, _ := commandContext(ctx, env, "replicate", "-config", filepath.Join(testDir, "litestream.yml")) + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + db, err := sql.Open("sqlite3", filepath.Join(tempDir, "db")) + if err != nil { + t.Fatal(err) + } else if _, err := db.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil { + t.Fatal(err) + } else if _, err := db.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil { + t.Fatal(err) + } + defer db.Close() + + time.Sleep(1 * time.Second) + + // Execute writes periodically. + for i := 0; i < 10; i++ { + t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", i) + if _, err := db.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, i); err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) + } + + // Stop & wait for Litestream command. + killLitestreamCmd(t, cmd, stdout) + + // Ensure signal and shutdown are logged. + if s := stdout.String(); !strings.Contains(s, `signal received, litestream shutting down`) { + t.Fatal("missing log output for signal received") + } else if s := stdout.String(); !strings.Contains(s, `litestream shut down`) { + t.Fatal("missing log output for shut down") + } + + // Checkpoint & verify original SQLite database. + if _, err := db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil { + t.Fatal(err) + } + restoreAndVerify(t, ctx, env, filepath.Join(testDir, "litestream.yml"), filepath.Join(tempDir, "db")) +} + // Ensure the default configuration works with heavy write load. func TestCmd_Replicate_HighLoad(t *testing.T) { if testing.Short() { diff --git a/integration/testdata/replicate/high-load/litestream.yml b/integration/testdata/replicate/high-load/litestream.yml index 26fb119..5e11635 100644 --- a/integration/testdata/replicate/high-load/litestream.yml +++ b/integration/testdata/replicate/high-load/litestream.yml @@ -3,5 +3,4 @@ dbs: replicas: - path: $LITESTREAM_TEMPDIR/replica - monitor-interval: 100ms max-checkpoint-page-count: 20 diff --git a/integration/testdata/replicate/no-monitor-delay-interval/litestream.yml b/integration/testdata/replicate/no-monitor-delay-interval/litestream.yml new file mode 100644 index 0000000..e597b31 --- /dev/null +++ b/integration/testdata/replicate/no-monitor-delay-interval/litestream.yml @@ -0,0 +1,5 @@ +dbs: + - path: $LITESTREAM_TEMPDIR/db + monitor-delay-interval: 0 + replicas: + - path: $LITESTREAM_TEMPDIR/replica diff --git a/integration/testdata/replicate/ok/litestream.yml b/integration/testdata/replicate/ok/litestream.yml index 26fb119..5e11635 100644 --- a/integration/testdata/replicate/ok/litestream.yml +++ b/integration/testdata/replicate/ok/litestream.yml @@ -3,5 +3,4 @@ dbs: replicas: - path: $LITESTREAM_TEMPDIR/replica - monitor-interval: 100ms max-checkpoint-page-count: 20 diff --git a/integration/testdata/replicate/resume/litestream.yml b/integration/testdata/replicate/resume/litestream.yml index 0bdd84e..494ece0 100644 --- a/integration/testdata/replicate/resume/litestream.yml +++ b/integration/testdata/replicate/resume/litestream.yml @@ -3,5 +3,4 @@ dbs: replicas: - path: $LITESTREAM_TEMPDIR/replica - monitor-interval: 100ms max-checkpoint-page-count: 10