Add minimum shadow WAL retention

This commit is contained in:
Ben Johnson
2022-04-04 21:18:05 -06:00
parent 44662022fa
commit f53857e1ad
8 changed files with 131 additions and 10 deletions

View File

@@ -289,6 +289,7 @@ type DBConfig struct {
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"` CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"` MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"` MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
ShadowRetentionN *int `yaml:"shadow-retention-count"`
Replicas []*ReplicaConfig `yaml:"replicas"` Replicas []*ReplicaConfig `yaml:"replicas"`
} }
@@ -330,6 +331,9 @@ func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error)
if dbc.MaxCheckpointPageN != nil { if dbc.MaxCheckpointPageN != nil {
db.MaxCheckpointPageN = *dbc.MaxCheckpointPageN db.MaxCheckpointPageN = *dbc.MaxCheckpointPageN
} }
if dbc.ShadowRetentionN != nil {
db.ShadowRetentionN = *dbc.ShadowRetentionN
}
// Instantiate and attach replicas. // Instantiate and attach replicas.
for _, rc := range dbc.Replicas { for _, rc := range dbc.Replicas {

27
db.go
View File

@@ -33,6 +33,7 @@ const (
DefaultMinCheckpointPageN = 1000 DefaultMinCheckpointPageN = 1000
DefaultMaxCheckpointPageN = 10000 DefaultMaxCheckpointPageN = 10000
DefaultShadowRetentionN = 32
) )
// MaxIndex is the maximum possible WAL index. // MaxIndex is the maximum possible WAL index.
@@ -102,6 +103,10 @@ type DB struct {
// unbounded if there are always read transactions occurring. // unbounded if there are always read transactions occurring.
MaxCheckpointPageN int MaxCheckpointPageN int
// Number of shadow WAL indexes to retain. This keeps files long enough for
// live replicas to retrieve the data but allows files to eventually be removed.
ShadowRetentionN int
// Time after receiving change notification before reading next WAL segment. // Time after receiving change notification before reading next WAL segment.
// Used for batching changes into fewer files instead of every transaction // Used for batching changes into fewer files instead of every transaction
// creating its own file. // creating its own file.
@@ -129,6 +134,7 @@ func NewDB(path string) *DB {
MinCheckpointPageN: DefaultMinCheckpointPageN, MinCheckpointPageN: DefaultMinCheckpointPageN,
MaxCheckpointPageN: DefaultMaxCheckpointPageN, MaxCheckpointPageN: DefaultMaxCheckpointPageN,
ShadowRetentionN: DefaultShadowRetentionN,
MonitorDelayInterval: DefaultMonitorDelayInterval, MonitorDelayInterval: DefaultMonitorDelayInterval,
CheckpointInterval: DefaultCheckpointInterval, CheckpointInterval: DefaultCheckpointInterval,
@@ -778,21 +784,26 @@ func (db *DB) cleanWAL(ctx context.Context) error {
generation, err := db.CurrentGeneration() generation, err := db.CurrentGeneration()
if err != nil { if err != nil {
return fmt.Errorf("current generation: %w", err) return fmt.Errorf("current generation: %w", err)
} else if generation == "" {
return nil
} }
// Determine lowest index that's been replicated to all replicas. // Determine lowest index that's been replicated to all replicas.
minIndex := -1 minReplicaIndex := -1
for _, r := range db.Replicas { for _, r := range db.Replicas {
pos := r.Pos().Truncate() pos := r.Pos().Truncate()
if pos.Generation != generation { if pos.Generation != generation {
continue // different generation, skip continue // different generation, skip
} else if minIndex == -1 || pos.Index < minIndex { } else if minReplicaIndex == -1 || pos.Index < minReplicaIndex {
minIndex = pos.Index minReplicaIndex = pos.Index
} }
} }
// Skip if our lowest position is too small. // Retain a certain number of WAL indexes since
if minIndex <= 0 { minRetentionIndex := db.pos.Index - db.ShadowRetentionN
// Skip if we have replicas but none have replicated this generation yet.
if len(db.Replicas) > 0 && minReplicaIndex <= 0 {
return nil return nil
} }
@@ -807,8 +818,10 @@ func (db *DB) cleanWAL(ctx context.Context) error {
index, err := ParseIndex(ent.Name()) index, err := ParseIndex(ent.Name())
if err != nil { if err != nil {
continue continue
} else if index >= minIndex { } else if len(db.Replicas) > 0 && index >= minReplicaIndex {
continue // not below min, skip continue // not replicated yet, skip
} else if index >= minRetentionIndex {
continue // retain certain number of indexes, skip
} }
if err := os.RemoveAll(filepath.Join(dir, FormatIndex(index))); err != nil { if err := os.RemoveAll(filepath.Join(dir, FormatIndex(index))); err != nil {

View File

@@ -455,9 +455,102 @@ func TestCmd_Replicate_HTTP(t *testing.T) {
} }
// Ensure a database can recover when disconnected from HTTP. // Ensure a database can recover when disconnected from HTTP.
func TestCmd_Replicate_HTTP_Recovery(t *testing.T) { func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
ctx := context.Background() ctx := context.Background()
testDir, tempDir := filepath.Join("testdata", "replicate", "http-recovery"), t.TempDir() testDir, tempDir := filepath.Join("testdata", "replicate", "http-partial-recovery"), t.TempDir()
if err := os.Mkdir(filepath.Join(tempDir, "0"), 0777); err != nil {
t.Fatal(err)
} else if err := os.Mkdir(filepath.Join(tempDir, "1"), 0777); err != nil {
t.Fatal(err)
}
env0 := []string{"LITESTREAM_TEMPDIR=" + tempDir}
env1 := []string{"LITESTREAM_TEMPDIR=" + tempDir, "LITESTREAM_UPSTREAM_URL=http://localhost:10002"}
cmd0, stdout0, _ := commandContext(ctx, env0, "replicate", "-config", filepath.Join(testDir, "litestream.0.yml"))
if err := cmd0.Start(); err != nil {
t.Fatal(err)
}
cmd1, stdout1, _ := commandContext(ctx, env1, "replicate", "-config", filepath.Join(testDir, "litestream.1.yml"))
if err := cmd1.Start(); err != nil {
t.Fatal(err)
}
db0, err := sql.Open("sqlite3", filepath.Join(tempDir, "0", "db"))
if err != nil {
t.Fatal(err)
} else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil {
t.Fatal(err)
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil {
t.Fatal(err)
}
defer db0.Close()
var index int
insertAndWait := func() {
index++
t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", index)
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, index); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
}
// Execute writes periodically.
for i := 0; i < 50; i++ {
insertAndWait()
}
// Kill the replica.
t.Logf("Killing replica...")
killLitestreamCmd(t, cmd1, stdout1)
t.Logf("Replica killed")
// Keep writing.
for i := 0; i < 25; i++ {
insertAndWait()
}
// Restart replica.
t.Logf("Restarting replica...")
cmd1, stdout1, _ = commandContext(ctx, env1, "replicate", "-config", filepath.Join(testDir, "litestream.1.yml"))
if err := cmd1.Start(); err != nil {
t.Fatal(err)
}
t.Logf("Replica restarted")
// Continue writing...
for i := 0; i < 25; i++ {
insertAndWait()
}
// Wait for replica to catch up.
time.Sleep(1 * time.Second)
// Verify count in replica table.
db1, err := sql.Open("sqlite3", filepath.Join(tempDir, "1", "db"))
if err != nil {
t.Fatal(err)
}
defer db1.Close()
var n int
if err := db1.QueryRowContext(ctx, `SELECT COUNT(*) FROM t`).Scan(&n); err != nil {
t.Fatal(err)
} else if got, want := n, 100; got != want {
t.Fatalf("replica count=%d, want %d", got, want)
}
// Stop & wait for Litestream command.
killLitestreamCmd(t, cmd1, stdout1) // kill
killLitestreamCmd(t, cmd0, stdout0)
}
// Ensure a database can recover when disconnected from HTTP but when last index
// is no longer available.
func TestCmd_Replicate_HTTP_FullRecovery(t *testing.T) {
ctx := context.Background()
testDir, tempDir := filepath.Join("testdata", "replicate", "http-full-recovery"), t.TempDir()
if err := os.Mkdir(filepath.Join(tempDir, "0"), 0777); err != nil { if err := os.Mkdir(filepath.Join(tempDir, "0"), 0777); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := os.Mkdir(filepath.Join(tempDir, "1"), 0777); err != nil { } else if err := os.Mkdir(filepath.Join(tempDir, "1"), 0777); err != nil {

View File

@@ -0,0 +1,6 @@
addr: :10002
dbs:
- path: $LITESTREAM_TEMPDIR/0/db
max-checkpoint-page-count: 5
shadow-retention-count: 3

View File

@@ -0,0 +1,5 @@
dbs:
- path: $LITESTREAM_TEMPDIR/1/db
upstream:
url: "$LITESTREAM_UPSTREAM_URL"
path: "$LITESTREAM_TEMPDIR/0/db"

View File

@@ -585,7 +585,7 @@ func (hdr *StreamRecordHeader) UnmarshalBinary(data []byte) error {
} }
hdr.Type = int(binary.BigEndian.Uint32(data[0:4])) hdr.Type = int(binary.BigEndian.Uint32(data[0:4]))
hdr.Flags = int(binary.BigEndian.Uint32(data[4:8])) hdr.Flags = int(binary.BigEndian.Uint32(data[4:8]))
hdr.Generation = fmt.Sprintf("%16x", binary.BigEndian.Uint64(data[8:16])) hdr.Generation = fmt.Sprintf("%016x", binary.BigEndian.Uint64(data[8:16]))
hdr.Index = int(binary.BigEndian.Uint64(data[16:24])) hdr.Index = int(binary.BigEndian.Uint64(data[16:24]))
hdr.Offset = int64(binary.BigEndian.Uint64(data[24:32])) hdr.Offset = int64(binary.BigEndian.Uint64(data[24:32]))
hdr.Size = int64(binary.BigEndian.Uint64(data[32:40])) hdr.Size = int64(binary.BigEndian.Uint64(data[32:40]))