Remove built-in validation option
Previously, Litestream had a validator that worked most of the time but also caused some false positives. It is difficult to provide validation from with Litestream without controlling outside processes that can also affect the database. As such, validation has been moved out to the external CI test runner which provides a more consistent validation process.
This commit is contained in:
175
replica.go
175
replica.go
@@ -3,12 +3,10 @@ package litestream
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"hash/crc64"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -118,11 +116,10 @@ func (r *Replica) Start(ctx context.Context) {
|
||||
ctx, r.cancel = context.WithCancel(ctx)
|
||||
|
||||
// Start goroutine to replicate data.
|
||||
r.wg.Add(4)
|
||||
r.wg.Add(3)
|
||||
go func() { defer r.wg.Done(); r.monitor(ctx) }()
|
||||
go func() { defer r.wg.Done(); r.retainer(ctx) }()
|
||||
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
|
||||
go func() { defer r.wg.Done(); r.validator(ctx) }()
|
||||
}
|
||||
|
||||
// Stop cancels any outstanding replication and blocks until finished.
|
||||
@@ -144,15 +141,6 @@ func (r *Replica) Stop(hard bool) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// logPrefix returns the prefix used when logging from the replica.
|
||||
// This includes the replica name as well as the database path, if available.
|
||||
func (r *Replica) logPrefix() string {
|
||||
if db := r.DB(); db != nil {
|
||||
return fmt.Sprintf("%s(%s): ", db.Path(), r.Name())
|
||||
}
|
||||
return r.Name() + ": "
|
||||
}
|
||||
|
||||
// Sync copies new WAL frames from the shadow WAL to the replica client.
|
||||
func (r *Replica) Sync(ctx context.Context) (err error) {
|
||||
// Clear last position if if an error occurs during sync.
|
||||
@@ -723,160 +711,6 @@ func (r *Replica) snapshotter(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// validator runs in a separate goroutine and handles periodic validation.
|
||||
func (r *Replica) validator(ctx context.Context) {
|
||||
// Initialize counters since validation occurs infrequently.
|
||||
for _, status := range []string{"ok", "error"} {
|
||||
replicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), status).Add(0)
|
||||
}
|
||||
|
||||
// Exit validation if interval is not set.
|
||||
if r.ValidationInterval <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(r.ValidationInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := r.Validate(ctx); err != nil {
|
||||
r.Logger.Printf("validation error: %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate restores the most recent data from a replica and validates
|
||||
// that the resulting database matches the current database.
|
||||
func (r *Replica) Validate(ctx context.Context) error {
|
||||
db := r.DB()
|
||||
|
||||
// Restore replica to a temporary directory.
|
||||
tmpdir, err := ioutil.TempDir("", "*-litestream")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
// Compute checksum of primary database under lock. This prevents a
|
||||
// sync from occurring and the database will not be written.
|
||||
chksum0, pos, err := db.CRC64(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||
}
|
||||
|
||||
// Wait until replica catches up to position.
|
||||
if err := r.waitForReplica(ctx, pos); err != nil {
|
||||
return fmt.Errorf("cannot wait for replica: %w", err)
|
||||
}
|
||||
|
||||
// Find lastest snapshot that occurs before the index.
|
||||
snapshotIndex, err := FindSnapshotForIndex(ctx, r.client, pos.Generation, pos.Index-1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot find snapshot index: %w", err)
|
||||
}
|
||||
|
||||
restorePath := filepath.Join(tmpdir, "replica")
|
||||
opt := RestoreOptions{
|
||||
Logger: log.New(os.Stderr, "", 0),
|
||||
LogPrefix: r.logPrefix(),
|
||||
}
|
||||
if err := Restore(ctx, r.client, restorePath, pos.Generation, snapshotIndex, pos.Index-1, opt); err != nil {
|
||||
return fmt.Errorf("cannot restore: %w", err)
|
||||
}
|
||||
|
||||
// Open file handle for restored database.
|
||||
// NOTE: This open is ok as the restored database is not managed by litestream.
|
||||
f, err := os.Open(restorePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Read entire file into checksum.
|
||||
h := crc64.New(crc64.MakeTable(crc64.ISO))
|
||||
if _, err := io.Copy(h, f); err != nil {
|
||||
return err
|
||||
}
|
||||
chksum1 := h.Sum64()
|
||||
|
||||
status := "ok"
|
||||
mismatch := chksum0 != chksum1
|
||||
if mismatch {
|
||||
status = "mismatch"
|
||||
}
|
||||
r.Logger.Printf("validator: status=%s db=%016x replica=%016x pos=%s", status, chksum0, chksum1, pos)
|
||||
|
||||
// Validate checksums match.
|
||||
if mismatch {
|
||||
replicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), "error").Inc()
|
||||
return ErrChecksumMismatch
|
||||
}
|
||||
|
||||
replicaValidationTotalCounterVec.WithLabelValues(r.db.Path(), r.Name(), "ok").Inc()
|
||||
|
||||
if err := os.RemoveAll(tmpdir); err != nil {
|
||||
return fmt.Errorf("cannot remove temporary validation directory: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForReplica blocks until replica reaches at least the given position.
|
||||
func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
timer := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
once := make(chan struct{}, 1)
|
||||
once <- struct{}{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
return fmt.Errorf("replica wait exceeded timeout")
|
||||
case <-ticker.C:
|
||||
case <-once: // immediate on first check
|
||||
}
|
||||
|
||||
// Obtain current position of replica, check if past target position.
|
||||
curr := r.Pos()
|
||||
if curr.IsZero() {
|
||||
r.Logger.Printf("validator: no replica position available")
|
||||
continue
|
||||
}
|
||||
|
||||
// Exit if the generation has changed while waiting as there will be
|
||||
// no further progress on the old generation.
|
||||
if curr.Generation != pos.Generation {
|
||||
return fmt.Errorf("generation changed")
|
||||
}
|
||||
|
||||
ready := true
|
||||
if curr.Index < pos.Index {
|
||||
ready = false
|
||||
} else if curr.Index == pos.Index && curr.Offset < pos.Offset {
|
||||
ready = false
|
||||
}
|
||||
|
||||
// If not ready, restart loop.
|
||||
if !ready {
|
||||
continue
|
||||
}
|
||||
|
||||
// Current position at or after target position.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// GenerationCreatedAt returns the earliest creation time of any snapshot.
|
||||
// Returns zero time if no snapshots exist.
|
||||
func (r *Replica) GenerationCreatedAt(ctx context.Context, generation string) (time.Time, error) {
|
||||
@@ -970,11 +804,4 @@ var (
|
||||
Name: "wal_offset",
|
||||
Help: "The current WAL offset",
|
||||
}, []string{"db", "name"})
|
||||
|
||||
replicaValidationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "litestream",
|
||||
Subsystem: "replica",
|
||||
Name: "validation_total",
|
||||
Help: "The number of validations performed",
|
||||
}, []string{"db", "name", "status"})
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user