From f53857e1ada85b0b5b61230aadc1ceae053b44fc Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 4 Apr 2022 21:18:05 -0600 Subject: [PATCH] Add minimum shadow WAL retention --- cmd/litestream/main.go | 4 + db.go | 27 ++++-- integration/cmd_test.go | 97 ++++++++++++++++++- .../http-full-recovery/litestream.0.yml | 6 ++ .../litestream.1.yml | 0 .../litestream.0.yml | 0 .../http-partial-recovery/litestream.1.yml | 5 + litestream.go | 2 +- 8 files changed, 131 insertions(+), 10 deletions(-) create mode 100644 integration/testdata/replicate/http-full-recovery/litestream.0.yml rename integration/testdata/replicate/{http-recovery => http-full-recovery}/litestream.1.yml (100%) rename integration/testdata/replicate/{http-recovery => http-partial-recovery}/litestream.0.yml (100%) create mode 100644 integration/testdata/replicate/http-partial-recovery/litestream.1.yml diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index d6d145e..3cf5a0e 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -289,6 +289,7 @@ type DBConfig struct { CheckpointInterval *time.Duration `yaml:"checkpoint-interval"` MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"` MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"` + ShadowRetentionN *int `yaml:"shadow-retention-count"` Replicas []*ReplicaConfig `yaml:"replicas"` } @@ -330,6 +331,9 @@ func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error) if dbc.MaxCheckpointPageN != nil { db.MaxCheckpointPageN = *dbc.MaxCheckpointPageN } + if dbc.ShadowRetentionN != nil { + db.ShadowRetentionN = *dbc.ShadowRetentionN + } // Instantiate and attach replicas. for _, rc := range dbc.Replicas { diff --git a/db.go b/db.go index b244698..160cd47 100644 --- a/db.go +++ b/db.go @@ -33,6 +33,7 @@ const ( DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 + DefaultShadowRetentionN = 32 ) // MaxIndex is the maximum possible WAL index. @@ -102,6 +103,10 @@ type DB struct { // unbounded if there are always read transactions occurring. MaxCheckpointPageN int + // Number of shadow WAL indexes to retain. This keeps files long enough for + // live replicas to retrieve the data but allows files to eventually be removed. + ShadowRetentionN int + // Time after receiving change notification before reading next WAL segment. // Used for batching changes into fewer files instead of every transaction // creating its own file. @@ -129,6 +134,7 @@ func NewDB(path string) *DB { MinCheckpointPageN: DefaultMinCheckpointPageN, MaxCheckpointPageN: DefaultMaxCheckpointPageN, + ShadowRetentionN: DefaultShadowRetentionN, MonitorDelayInterval: DefaultMonitorDelayInterval, CheckpointInterval: DefaultCheckpointInterval, @@ -778,21 +784,26 @@ func (db *DB) cleanWAL(ctx context.Context) error { generation, err := db.CurrentGeneration() if err != nil { return fmt.Errorf("current generation: %w", err) + } else if generation == "" { + return nil } // Determine lowest index that's been replicated to all replicas. - minIndex := -1 + minReplicaIndex := -1 for _, r := range db.Replicas { pos := r.Pos().Truncate() if pos.Generation != generation { continue // different generation, skip - } else if minIndex == -1 || pos.Index < minIndex { - minIndex = pos.Index + } else if minReplicaIndex == -1 || pos.Index < minReplicaIndex { + minReplicaIndex = pos.Index } } - // Skip if our lowest position is too small. - if minIndex <= 0 { + // Retain a certain number of WAL indexes since + minRetentionIndex := db.pos.Index - db.ShadowRetentionN + + // Skip if we have replicas but none have replicated this generation yet. + if len(db.Replicas) > 0 && minReplicaIndex <= 0 { return nil } @@ -807,8 +818,10 @@ func (db *DB) cleanWAL(ctx context.Context) error { index, err := ParseIndex(ent.Name()) if err != nil { continue - } else if index >= minIndex { - continue // not below min, skip + } else if len(db.Replicas) > 0 && index >= minReplicaIndex { + continue // not replicated yet, skip + } else if index >= minRetentionIndex { + continue // retain certain number of indexes, skip } if err := os.RemoveAll(filepath.Join(dir, FormatIndex(index))); err != nil { diff --git a/integration/cmd_test.go b/integration/cmd_test.go index 96b8558..62ee336 100644 --- a/integration/cmd_test.go +++ b/integration/cmd_test.go @@ -455,9 +455,102 @@ func TestCmd_Replicate_HTTP(t *testing.T) { } // Ensure a database can recover when disconnected from HTTP. -func TestCmd_Replicate_HTTP_Recovery(t *testing.T) { +func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) { ctx := context.Background() - testDir, tempDir := filepath.Join("testdata", "replicate", "http-recovery"), t.TempDir() + testDir, tempDir := filepath.Join("testdata", "replicate", "http-partial-recovery"), t.TempDir() + if err := os.Mkdir(filepath.Join(tempDir, "0"), 0777); err != nil { + t.Fatal(err) + } else if err := os.Mkdir(filepath.Join(tempDir, "1"), 0777); err != nil { + t.Fatal(err) + } + + env0 := []string{"LITESTREAM_TEMPDIR=" + tempDir} + env1 := []string{"LITESTREAM_TEMPDIR=" + tempDir, "LITESTREAM_UPSTREAM_URL=http://localhost:10002"} + + cmd0, stdout0, _ := commandContext(ctx, env0, "replicate", "-config", filepath.Join(testDir, "litestream.0.yml")) + if err := cmd0.Start(); err != nil { + t.Fatal(err) + } + cmd1, stdout1, _ := commandContext(ctx, env1, "replicate", "-config", filepath.Join(testDir, "litestream.1.yml")) + if err := cmd1.Start(); err != nil { + t.Fatal(err) + } + + db0, err := sql.Open("sqlite3", filepath.Join(tempDir, "0", "db")) + if err != nil { + t.Fatal(err) + } else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil { + t.Fatal(err) + } else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil { + t.Fatal(err) + } + defer db0.Close() + + var index int + insertAndWait := func() { + index++ + t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", index) + if _, err := db0.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, index); err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) + } + + // Execute writes periodically. + for i := 0; i < 50; i++ { + insertAndWait() + } + + // Kill the replica. + t.Logf("Killing replica...") + killLitestreamCmd(t, cmd1, stdout1) + t.Logf("Replica killed") + + // Keep writing. + for i := 0; i < 25; i++ { + insertAndWait() + } + + // Restart replica. + t.Logf("Restarting replica...") + cmd1, stdout1, _ = commandContext(ctx, env1, "replicate", "-config", filepath.Join(testDir, "litestream.1.yml")) + if err := cmd1.Start(); err != nil { + t.Fatal(err) + } + t.Logf("Replica restarted") + + // Continue writing... + for i := 0; i < 25; i++ { + insertAndWait() + } + + // Wait for replica to catch up. + time.Sleep(1 * time.Second) + + // Verify count in replica table. + db1, err := sql.Open("sqlite3", filepath.Join(tempDir, "1", "db")) + if err != nil { + t.Fatal(err) + } + defer db1.Close() + + var n int + if err := db1.QueryRowContext(ctx, `SELECT COUNT(*) FROM t`).Scan(&n); err != nil { + t.Fatal(err) + } else if got, want := n, 100; got != want { + t.Fatalf("replica count=%d, want %d", got, want) + } + + // Stop & wait for Litestream command. + killLitestreamCmd(t, cmd1, stdout1) // kill + killLitestreamCmd(t, cmd0, stdout0) +} + +// Ensure a database can recover when disconnected from HTTP but when last index +// is no longer available. +func TestCmd_Replicate_HTTP_FullRecovery(t *testing.T) { + ctx := context.Background() + testDir, tempDir := filepath.Join("testdata", "replicate", "http-full-recovery"), t.TempDir() if err := os.Mkdir(filepath.Join(tempDir, "0"), 0777); err != nil { t.Fatal(err) } else if err := os.Mkdir(filepath.Join(tempDir, "1"), 0777); err != nil { diff --git a/integration/testdata/replicate/http-full-recovery/litestream.0.yml b/integration/testdata/replicate/http-full-recovery/litestream.0.yml new file mode 100644 index 0000000..88dea07 --- /dev/null +++ b/integration/testdata/replicate/http-full-recovery/litestream.0.yml @@ -0,0 +1,6 @@ +addr: :10002 + +dbs: + - path: $LITESTREAM_TEMPDIR/0/db + max-checkpoint-page-count: 5 + shadow-retention-count: 3 diff --git a/integration/testdata/replicate/http-recovery/litestream.1.yml b/integration/testdata/replicate/http-full-recovery/litestream.1.yml similarity index 100% rename from integration/testdata/replicate/http-recovery/litestream.1.yml rename to integration/testdata/replicate/http-full-recovery/litestream.1.yml diff --git a/integration/testdata/replicate/http-recovery/litestream.0.yml b/integration/testdata/replicate/http-partial-recovery/litestream.0.yml similarity index 100% rename from integration/testdata/replicate/http-recovery/litestream.0.yml rename to integration/testdata/replicate/http-partial-recovery/litestream.0.yml diff --git a/integration/testdata/replicate/http-partial-recovery/litestream.1.yml b/integration/testdata/replicate/http-partial-recovery/litestream.1.yml new file mode 100644 index 0000000..e973570 --- /dev/null +++ b/integration/testdata/replicate/http-partial-recovery/litestream.1.yml @@ -0,0 +1,5 @@ +dbs: + - path: $LITESTREAM_TEMPDIR/1/db + upstream: + url: "$LITESTREAM_UPSTREAM_URL" + path: "$LITESTREAM_TEMPDIR/0/db" diff --git a/litestream.go b/litestream.go index 0e9509c..b6ca7f0 100644 --- a/litestream.go +++ b/litestream.go @@ -585,7 +585,7 @@ func (hdr *StreamRecordHeader) UnmarshalBinary(data []byte) error { } hdr.Type = int(binary.BigEndian.Uint32(data[0:4])) hdr.Flags = int(binary.BigEndian.Uint32(data[4:8])) - hdr.Generation = fmt.Sprintf("%16x", binary.BigEndian.Uint64(data[8:16])) + hdr.Generation = fmt.Sprintf("%016x", binary.BigEndian.Uint64(data[8:16])) hdr.Index = int(binary.BigEndian.Uint64(data[16:24])) hdr.Offset = int64(binary.BigEndian.Uint64(data[24:32])) hdr.Size = int64(binary.BigEndian.Uint64(data[32:40]))