diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 490ba18..a02d862 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -266,10 +266,11 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) { // DBConfig represents the configuration for a single database. type DBConfig struct { - Path string `yaml:"path"` - CheckpointInterval *time.Duration `yaml:"checkpoint-interval"` - MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"` - MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"` + Path string `yaml:"path"` + MonitorDelayInterval *time.Duration `yaml:"monitor-delay-interval"` + CheckpointInterval *time.Duration `yaml:"checkpoint-interval"` + MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"` + MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"` Replicas []*ReplicaConfig `yaml:"replicas"` } @@ -289,6 +290,9 @@ func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error) db := litestream.NewDB(path) // Override default database settings if specified in configuration. + if dbc.MonitorDelayInterval != nil { + db.MonitorDelayInterval = *dbc.MonitorDelayInterval + } if dbc.CheckpointInterval != nil { db.CheckpointInterval = *dbc.CheckpointInterval } diff --git a/db.go b/db.go index 7821f62..ff81c4e 100644 --- a/db.go +++ b/db.go @@ -27,15 +27,13 @@ import ( // Default DB settings. const ( - DefaultCheckpointInterval = 1 * time.Minute + DefaultMonitorDelayInterval = 10 * time.Millisecond + DefaultCheckpointInterval = 1 * time.Minute + DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 ) -// MonitorDelayInterval is the time Litestream will wait after receiving a file -// change notification before processing the WAL file for changes. -const MonitorDelayInterval = 10 * time.Millisecond - // MaxIndex is the maximum possible WAL index. // If this index is reached then a new generation will be started. const MaxIndex = 0x7FFFFFFF @@ -98,6 +96,11 @@ type DB struct { // unbounded if there are always read transactions occurring. MaxCheckpointPageN int + // Time after receiving change notification before reading next WAL segment. + // Used for batching changes into fewer files instead of every transaction + // creating its own file. + MonitorDelayInterval time.Duration + // Time between automatic checkpoints in the WAL. This is done to allow // more fine-grained WAL files so that restores can be performed with // better precision. @@ -118,9 +121,10 @@ func NewDB(path string) *DB { itrs: make(map[*FileWALSegmentIterator]struct{}), - MinCheckpointPageN: DefaultMinCheckpointPageN, - MaxCheckpointPageN: DefaultMaxCheckpointPageN, - CheckpointInterval: DefaultCheckpointInterval, + MinCheckpointPageN: DefaultMinCheckpointPageN, + MaxCheckpointPageN: DefaultMaxCheckpointPageN, + MonitorDelayInterval: DefaultMonitorDelayInterval, + CheckpointInterval: DefaultCheckpointInterval, Logger: log.New(LogWriter, fmt.Sprintf("%s: ", logPrefixPath(path)), LogFlags), } @@ -1497,8 +1501,12 @@ func (db *DB) execCheckpoint(mode string) (err error) { // monitor runs in a separate goroutine and monitors the database & WAL. func (db *DB) monitor() { - timer := time.NewTimer(MonitorDelayInterval) - defer timer.Stop() + var timer *time.Timer + + if db.MonitorDelayInterval > 0 { + timer := time.NewTimer(db.MonitorDelayInterval) + defer timer.Stop() + } for { // Wait for a file change notification from the file system. @@ -1509,8 +1517,10 @@ func (db *DB) monitor() { } // Wait for small delay before processing changes. - timer.Reset(MonitorDelayInterval) - <-timer.C + if timer != nil { + timer.Reset(db.MonitorDelayInterval) + <-timer.C + } // Clear any additional change notifications that occurred during delay. select { diff --git a/integration/cmd_test.go b/integration/cmd_test.go index a663f9c..a70ad53 100644 --- a/integration/cmd_test.go +++ b/integration/cmd_test.go @@ -215,6 +215,55 @@ func TestCmd_Replicate_ResumeWithNewGeneration(t *testing.T) { restoreAndVerify(t, ctx, env, filepath.Join(testDir, "litestream.yml"), filepath.Join(tempDir, "db")) } +// Ensure the monitor interval can be turned off. +func TestCmd_Replicate_NoMonitorDelayInterval(t *testing.T) { + ctx := context.Background() + testDir, tempDir := filepath.Join("testdata", "replicate", "no-monitor-delay-interval"), t.TempDir() + env := []string{"LITESTREAM_TEMPDIR=" + tempDir} + + cmd, stdout, _ := commandContext(ctx, env, "replicate", "-config", filepath.Join(testDir, "litestream.yml")) + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + db, err := sql.Open("sqlite3", filepath.Join(tempDir, "db")) + if err != nil { + t.Fatal(err) + } else if _, err := db.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil { + t.Fatal(err) + } else if _, err := db.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil { + t.Fatal(err) + } + defer db.Close() + + time.Sleep(1 * time.Second) + + // Execute writes periodically. + for i := 0; i < 10; i++ { + t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", i) + if _, err := db.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, i); err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) + } + + // Stop & wait for Litestream command. + killLitestreamCmd(t, cmd, stdout) + + // Ensure signal and shutdown are logged. + if s := stdout.String(); !strings.Contains(s, `signal received, litestream shutting down`) { + t.Fatal("missing log output for signal received") + } else if s := stdout.String(); !strings.Contains(s, `litestream shut down`) { + t.Fatal("missing log output for shut down") + } + + // Checkpoint & verify original SQLite database. + if _, err := db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil { + t.Fatal(err) + } + restoreAndVerify(t, ctx, env, filepath.Join(testDir, "litestream.yml"), filepath.Join(tempDir, "db")) +} + // Ensure the default configuration works with heavy write load. func TestCmd_Replicate_HighLoad(t *testing.T) { if testing.Short() { diff --git a/integration/testdata/replicate/high-load/litestream.yml b/integration/testdata/replicate/high-load/litestream.yml index 26fb119..5e11635 100644 --- a/integration/testdata/replicate/high-load/litestream.yml +++ b/integration/testdata/replicate/high-load/litestream.yml @@ -3,5 +3,4 @@ dbs: replicas: - path: $LITESTREAM_TEMPDIR/replica - monitor-interval: 100ms max-checkpoint-page-count: 20 diff --git a/integration/testdata/replicate/no-monitor-delay-interval/litestream.yml b/integration/testdata/replicate/no-monitor-delay-interval/litestream.yml new file mode 100644 index 0000000..e597b31 --- /dev/null +++ b/integration/testdata/replicate/no-monitor-delay-interval/litestream.yml @@ -0,0 +1,5 @@ +dbs: + - path: $LITESTREAM_TEMPDIR/db + monitor-delay-interval: 0 + replicas: + - path: $LITESTREAM_TEMPDIR/replica diff --git a/integration/testdata/replicate/ok/litestream.yml b/integration/testdata/replicate/ok/litestream.yml index 26fb119..5e11635 100644 --- a/integration/testdata/replicate/ok/litestream.yml +++ b/integration/testdata/replicate/ok/litestream.yml @@ -3,5 +3,4 @@ dbs: replicas: - path: $LITESTREAM_TEMPDIR/replica - monitor-interval: 100ms max-checkpoint-page-count: 20 diff --git a/integration/testdata/replicate/resume/litestream.yml b/integration/testdata/replicate/resume/litestream.yml index 0bdd84e..494ece0 100644 --- a/integration/testdata/replicate/resume/litestream.yml +++ b/integration/testdata/replicate/resume/litestream.yml @@ -3,5 +3,4 @@ dbs: replicas: - path: $LITESTREAM_TEMPDIR/replica - monitor-interval: 100ms max-checkpoint-page-count: 10