From b22f3f100d85d6681794066569a143d2f0468063 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 5 Jan 2021 15:48:50 -0700 Subject: [PATCH] Add FileReplica.Sync() unit tests. --- replica.go | 11 ++++++ replica_test.go | 90 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 replica_test.go diff --git a/replica.go b/replica.go index 58414a3..a09bb11 100644 --- a/replica.go +++ b/replica.go @@ -89,6 +89,10 @@ type FileReplica struct { wg sync.WaitGroup ctx context.Context cancel func() + + // If true, replica monitors database for changes automatically. + // Set to false if replica is being used synchronously (such as in tests). + MonitorEnabled bool } // NewFileReplica returns a new instance of FileReplica. @@ -98,6 +102,8 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica { name: name, dst: dst, cancel: func() {}, + + MonitorEnabled: true, } } @@ -406,6 +412,11 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) { // Start starts replication for a given generation. func (r *FileReplica) Start(ctx context.Context) { + // Ignore if replica is being used sychronously. + if !r.MonitorEnabled { + return + } + // Stop previous replication. r.Stop() diff --git a/replica_test.go b/replica_test.go new file mode 100644 index 0000000..834c485 --- /dev/null +++ b/replica_test.go @@ -0,0 +1,90 @@ +package litestream_test + +import ( + "context" + "testing" + + "github.com/benbjohnson/litestream" +) + +func TestFileReplica_Sync(t *testing.T) { + // Ensure replica can successfully sync after DB has sync'd. + t.Run("InitialSync", func(t *testing.T) { + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + r := NewTestFileReplica(t, db) + + // Sync database & then sync replica. + if err := db.Sync(); err != nil { + t.Fatal(err) + } else if err := r.Sync(context.Background()); err != nil { + t.Fatal(err) + } + + // Ensure posistions match. + if pos, err := db.Pos(); err != nil { + t.Fatal(err) + } else if got, want := r.LastPos(), pos; got != want { + t.Fatalf("LastPos()=%v, want %v", got, want) + } + }) + + // Ensure replica can successfully sync multiple times. + t.Run("MultiSync", func(t *testing.T) { + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + r := NewTestFileReplica(t, db) + + if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { + t.Fatal(err) + } + + // Write to the database multiple times and sync after each write. + for i, n := 0, db.MinCheckpointPageN*2; i < n; i++ { + if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz')`); err != nil { + t.Fatal(err) + } + + // Sync periodically. + if i%100 == 0 || i == n-1 { + if err := db.Sync(); err != nil { + t.Fatal(err) + } else if err := r.Sync(context.Background()); err != nil { + t.Fatal(err) + } + } + } + + // Ensure posistions match. + if pos, err := db.Pos(); err != nil { + t.Fatal(err) + } else if got, want := pos.Index, 2; got != want { + t.Fatalf("Index=%v, want %v", got, want) + } else if calcPos, err := r.CalcPos(pos.Generation); err != nil { + t.Fatal(err) + } else if got, want := calcPos, pos; got != want { + t.Fatalf("CalcPos()=%v, want %v", got, want) + } else if got, want := r.LastPos(), pos; got != want { + t.Fatalf("LastPos()=%v, want %v", got, want) + } + }) + + // Ensure replica returns an error if there is no generation available from the DB. + t.Run("ErrNoGeneration", func(t *testing.T) { + db, sqldb := MustOpenDBs(t) + defer MustCloseDBs(t, db, sqldb) + r := NewTestFileReplica(t, db) + + if err := r.Sync(context.Background()); err == nil || err.Error() != `no generation, waiting for data` { + t.Fatal(err) + } + }) +} + +// NewTestFileReplica returns a new replica using a temp directory & with monitoring disabled. +func NewTestFileReplica(tb testing.TB, db *litestream.DB) *litestream.FileReplica { + r := litestream.NewFileReplica(db, "", tb.TempDir()) + r.MonitorEnabled = false + db.Replicas = []litestream.Replica{r} + return r +}