Implement live read replication

This commit adds an http server and client for streaming snapshots
and WAL pages from an upstream Litestream primary to a read-only
replica.
This commit is contained in:
Ben Johnson
2022-02-19 07:46:01 -07:00
parent 4898fc2fc1
commit a090706421
19 changed files with 1241 additions and 57 deletions

408
db.go
View File

@@ -23,6 +23,7 @@ import (
"github.com/pierrec/lz4/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
)
// Default DB settings.
@@ -68,7 +69,7 @@ type DB struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
g errgroup.Group
// Metrics
dbSizeGauge prometheus.Gauge
@@ -83,6 +84,11 @@ type DB struct {
checkpointErrorNCounterVec *prometheus.CounterVec
checkpointSecondsCounterVec *prometheus.CounterVec
// Client used to receive live, upstream changes. If specified, then
// DB should be used as read-only as local changes will conflict with
// upstream changes.
StreamClient StreamClient
// Minimum threshold of WAL size, in pages, before a passive checkpoint.
// A passive checkpoint will attempt a checkpoint but fail if there are
// active transactions occurring at the same time.
@@ -161,6 +167,11 @@ func (db *DB) WALPath() string {
return db.path + "-wal"
}
// SHMPath returns the path to the database's shared memory file.
func (db *DB) SHMPath() string {
return db.path + "-shm"
}
// MetaPath returns the path to the database metadata.
func (db *DB) MetaPath() string {
dir, file := filepath.Split(db.path)
@@ -179,6 +190,12 @@ func (db *DB) GenerationPath(generation string) string {
return filepath.Join(db.MetaPath(), "generations", generation)
}
// PositionPath returns the path of the file that stores the current position.
// This file is only used to communicate state to external processes.
func (db *DB) PositionPath() string {
return filepath.Join(db.MetaPath(), "position")
}
// ShadowWALDir returns the path of the shadow wal directory.
// Panics if generation is blank.
func (db *DB) ShadowWALDir(generation string) string {
@@ -399,9 +416,10 @@ func (db *DB) Open() (err error) {
return fmt.Errorf("cannot remove tmp files: %w", err)
}
// Start monitoring SQLite database in a separate goroutine.
db.wg.Add(1)
go func() { defer db.wg.Done(); db.monitor() }()
// If an upstream client is specified, then we should simply stream changes
// into the database. If it is not specified, then we should monitor the
// database for local changes and replicate them out.
db.g.Go(func() error { return db.monitor(db.ctx) })
return nil
}
@@ -410,7 +428,9 @@ func (db *DB) Open() (err error) {
// and closes the database.
func (db *DB) Close() (err error) {
db.cancel()
db.wg.Wait()
if e := db.g.Wait(); e != nil && err == nil {
err = e
}
// Start a new context for shutdown since we canceled the DB context.
ctx := context.Background()
@@ -484,8 +504,8 @@ func (db *DB) UpdatedAt() (time.Time, error) {
return t, nil
}
// init initializes the connection to the database.
// Skipped if already initialized or if the database file does not exist.
// init initializes the connection to the database. Skipped if already
// initialized or if the database file does not exist.
func (db *DB) init() (err error) {
// Exit if already initialized.
if db.db != nil {
@@ -493,17 +513,15 @@ func (db *DB) init() (err error) {
}
// Exit if no database file exists.
fi, err := os.Stat(db.path)
if os.IsNotExist(err) {
if _, err := os.Stat(db.path); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
db.fileMode = fi.Mode()
db.uid, db.gid = internal.Fileinfo(fi)
// Obtain permissions for parent directory.
if fi, err = os.Stat(filepath.Dir(db.path)); err != nil {
fi, err := os.Stat(filepath.Dir(db.path))
if err != nil {
return err
}
db.dirMode = fi.Mode()
@@ -517,22 +535,6 @@ func (db *DB) init() (err error) {
return err
}
// Open long-running database file descriptor. Required for non-OFD locks.
if db.f, err = os.Open(db.path); err != nil {
return fmt.Errorf("open db file descriptor: %w", err)
}
// Ensure database is closed if init fails.
// Initialization can retry on next sync.
defer func() {
if err != nil {
_ = db.releaseReadLock()
db.db.Close()
db.f.Close()
db.db, db.f = nil, nil
}
}()
// Enable WAL and ensure it is set. New mode should be returned on success:
// https://www.sqlite.org/pragma.html#pragma_journal_mode
var mode string
@@ -559,6 +561,30 @@ func (db *DB) init() (err error) {
return fmt.Errorf("create _litestream_lock table: %w", err)
}
// Open long-running database file descriptor. Required for non-OFD locks.
if db.f, err = os.Open(db.path); err != nil {
return fmt.Errorf("open db file descriptor: %w", err)
}
// Ensure database is closed if init fails.
// Initialization can retry on next sync.
defer func() {
if err != nil {
_ = db.releaseReadLock()
db.db.Close()
db.f.Close()
db.db, db.f = nil, nil
}
}()
// Obtain file info once we know the database exists.
fi, err = os.Stat(db.path)
if err != nil {
return fmt.Errorf("init file stat: %w", err)
}
db.fileMode = fi.Mode()
db.uid, db.gid = internal.Fileinfo(fi)
// Start a long-running read transaction to prevent other transactions
// from checkpointing.
if err := db.acquireReadLock(); err != nil {
@@ -603,6 +629,76 @@ func (db *DB) init() (err error) {
return nil
}
// initReplica initializes a new database file as a replica of an upstream database.
func (db *DB) initReplica(pageSize int) (err error) {
// Exit if already initialized.
if db.db != nil {
return nil
}
// Obtain permissions for parent directory.
fi, err := os.Stat(filepath.Dir(db.path))
if err != nil {
return err
}
db.dirMode = fi.Mode()
dsn := db.path
dsn += fmt.Sprintf("?_busy_timeout=%d", BusyTimeout.Milliseconds())
// Connect to SQLite database. Use the driver registered with a hook to
// prevent WAL files from being removed.
if db.db, err = sql.Open("litestream-sqlite3", dsn); err != nil {
return err
}
// Initialize database file if it doesn't exist. It doesn't matter what we
// store in it as it will be erased by the replication. We just need to
// ensure a WAL file is created and there is at least a page in the database.
if _, err := os.Stat(db.path); os.IsNotExist(err) {
if _, err := db.db.ExecContext(db.ctx, fmt.Sprintf(`PRAGMA page_size = %d`, pageSize)); err != nil {
return fmt.Errorf("set page size: %w", err)
}
var mode string
if err := db.db.QueryRow(`PRAGMA journal_mode = wal`).Scan(&mode); err != nil {
return err
} else if mode != "wal" {
return fmt.Errorf("enable wal failed, mode=%q", mode)
}
// TODO: Set page size.
if _, err := db.db.ExecContext(db.ctx, `CREATE TABLE IF NOT EXISTS _litestream (id INTEGER)`); err != nil {
return fmt.Errorf("create _litestream table: %w", err)
} else if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
return fmt.Errorf("create _litestream table: %w", err)
}
}
// Obtain file info once we know the database exists.
fi, err = os.Stat(db.path)
if err != nil {
return fmt.Errorf("init file stat: %w", err)
}
db.fileMode = fi.Mode()
db.uid, db.gid = internal.Fileinfo(fi)
// Verify page size matches.
if err := db.db.QueryRowContext(db.ctx, `PRAGMA page_size;`).Scan(&db.pageSize); err != nil {
return fmt.Errorf("read page size: %w", err)
} else if db.pageSize != pageSize {
return fmt.Errorf("page size mismatch: %d <> %d", db.pageSize, pageSize)
}
// Ensure meta directory structure exists.
if err := internal.MkdirAll(db.MetaPath(), db.dirMode, db.uid, db.gid); err != nil {
return err
}
return nil
}
func (db *DB) clearGeneration(ctx context.Context) error {
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return err
@@ -1278,6 +1374,11 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
return err
}
// Write position to file so other processes can read it.
if err := db.writePositionFile(pos); err != nil {
return fmt.Errorf("write position file: %w", err)
}
// Generate
info := WALSegmentInfo{
Generation: pos.Generation,
@@ -1308,6 +1409,11 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
return nil
}
// writePositionFile writes pos as the current position.
func (db *DB) writePositionFile(pos Pos) error {
return internal.WriteFile(db.PositionPath(), []byte(pos.String()+"\n"), db.fileMode, db.uid, db.gid)
}
// WALSegments returns an iterator over all available WAL files for a generation.
func (db *DB) WALSegments(ctx context.Context, generation string) (*FileWALSegmentIterator, error) {
db.mu.Lock()
@@ -1499,20 +1605,26 @@ func (db *DB) execCheckpoint(mode string) (err error) {
return nil
}
// monitor runs in a separate goroutine and monitors the database & WAL.
func (db *DB) monitor() {
var timer *time.Timer
func (db *DB) monitor(ctx context.Context) error {
if db.StreamClient != nil {
return db.monitorUpstream(ctx)
}
return db.monitorLocal(ctx)
}
// monitor runs in a separate goroutine and monitors the local database & WAL.
func (db *DB) monitorLocal(ctx context.Context) error {
var timer *time.Timer
if db.MonitorDelayInterval > 0 {
timer := time.NewTimer(db.MonitorDelayInterval)
timer = time.NewTimer(db.MonitorDelayInterval)
defer timer.Stop()
}
for {
// Wait for a file change notification from the file system.
select {
case <-db.ctx.Done():
return
case <-ctx.Done():
return nil
case <-db.notifyCh:
}
@@ -1528,12 +1640,193 @@ func (db *DB) monitor() {
default:
}
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
if err := db.Sync(ctx); err != nil && !errors.Is(err, context.Canceled) {
db.Logger.Printf("sync error: %s", err)
}
}
}
// monitorUpstream runs in a separate goroutine and streams data into the local DB.
func (db *DB) monitorUpstream(ctx context.Context) error {
for {
if err := db.stream(ctx); err != nil {
if ctx.Err() != nil {
return nil
}
db.Logger.Printf("stream error, retrying: %s", err)
}
// Delay before retrying stream.
select {
case <-ctx.Done():
return nil
case <-time.After(1 * time.Second):
}
}
}
// stream initializes the local database and continuously streams new upstream data.
func (db *DB) stream(ctx context.Context) error {
// Continuously stream and apply records from client.
sr, err := db.StreamClient.Stream(ctx)
if err != nil {
return fmt.Errorf("stream connect: %w", err)
}
defer sr.Close()
// TODO: Determine page size of upstream database before creating local.
const pageSize = 4096
// Initialize the database and create it if it doesn't exist.
if err := db.initReplica(pageSize); err != nil {
return fmt.Errorf("init replica: %w", err)
}
for {
hdr, err := sr.Next()
if err != nil {
return err
}
switch hdr.Type {
case StreamRecordTypeSnapshot:
if err := db.streamSnapshot(ctx, hdr, sr); err != nil {
return fmt.Errorf("snapshot: %w", err)
}
case StreamRecordTypeWALSegment:
if err := db.streamWALSegment(ctx, hdr, sr); err != nil {
return fmt.Errorf("wal segment: %w", err)
}
default:
return fmt.Errorf("invalid stream record type: 0x%02x", hdr.Type)
}
}
}
// streamSnapshot reads the snapshot into the WAL and applies it to the main database.
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
// Truncate WAL file.
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
return fmt.Errorf("truncate: %w", err)
}
// Determine total page count.
pageN := int(hdr.Size / int64(db.pageSize))
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
if err := ww.Open(); err != nil {
return fmt.Errorf("open wal writer: %w", err)
}
defer func() { _ = ww.Close() }()
if err := ww.WriteHeader(); err != nil {
return fmt.Errorf("write wal header: %w", err)
}
// Iterate over pages
buf := make([]byte, db.pageSize)
for pgno := uint32(1); ; pgno++ {
// Read snapshot page into a buffer.
if _, err := io.ReadFull(r, buf); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("read snapshot page %d: %w", pgno, err)
}
// Issue a commit flag when the last page is reached.
var commit uint32
if pgno == uint32(pageN) {
commit = uint32(pageN)
}
// Write page into WAL frame.
if err := ww.WriteFrame(pgno, commit, buf); err != nil {
return fmt.Errorf("write wal frame: %w", err)
}
}
// Close WAL file writer.
if err := ww.Close(); err != nil {
return fmt.Errorf("close wal writer: %w", err)
}
// Invalidate WAL index.
if err := invalidateSHMFile(db.path); err != nil {
return fmt.Errorf("invalidate shm file: %w", err)
}
// Write position to file so other processes can read it.
if err := db.writePositionFile(hdr.Pos()); err != nil {
return fmt.Errorf("write position file: %w", err)
}
db.Logger.Printf("snapshot applied")
return nil
}
// streamWALSegment rewrites a WAL segment into the local WAL and applies it to the main database.
func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
// Decompress incoming segment
zr := lz4.NewReader(r)
// Drop WAL header if starting from offset zero.
if hdr.Offset == 0 {
if _, err := io.CopyN(io.Discard, zr, WALHeaderSize); err != nil {
return fmt.Errorf("read wal header: %w", err)
}
}
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
if err := ww.Open(); err != nil {
return fmt.Errorf("open wal writer: %w", err)
}
defer func() { _ = ww.Close() }()
if err := ww.WriteHeader(); err != nil {
return fmt.Errorf("write wal header: %w", err)
}
// Iterate over incoming WAL pages.
buf := make([]byte, WALFrameHeaderSize+db.pageSize)
for i := 0; ; i++ {
// Read snapshot page into a buffer.
if _, err := io.ReadFull(zr, buf); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("read wal frame %d: %w", i, err)
}
// Read page number & commit field.
pgno := binary.BigEndian.Uint32(buf[0:])
commit := binary.BigEndian.Uint32(buf[4:])
// Write page into WAL frame.
if err := ww.WriteFrame(pgno, commit, buf[WALFrameHeaderSize:]); err != nil {
return fmt.Errorf("write wal frame: %w", err)
}
}
// Close WAL file writer.
if err := ww.Close(); err != nil {
return fmt.Errorf("close wal writer: %w", err)
}
// Invalidate WAL index.
if err := invalidateSHMFile(db.path); err != nil {
return fmt.Errorf("invalidate shm file: %w", err)
}
// Write position to file so other processes can read it.
if err := db.writePositionFile(hdr.Pos()); err != nil {
return fmt.Errorf("write position file: %w", err)
}
db.Logger.Printf("wal segment applied: %s", hdr.Pos().String())
return nil
}
// ApplyWAL performs a truncating checkpoint on the given database.
func ApplyWAL(ctx context.Context, dbPath, walPath string) error {
// Copy WAL file from it's staging path to the correct "-wal" location.
@@ -1681,6 +1974,51 @@ func logPrefixPath(path string) string {
return path
}
// invalidateSHMFile clears the iVersion field of the -shm file in order that
// the next transaction will rebuild it.
func invalidateSHMFile(dbPath string) error {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return fmt.Errorf("reopen db: %w", err)
}
defer func() { _ = db.Close() }()
if _, err := db.Exec(`PRAGMA wal_checkpoint(PASSIVE)`); err != nil {
return fmt.Errorf("passive checkpoint: %w", err)
}
f, err := os.OpenFile(dbPath+"-shm", os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("open shm index: %w", err)
}
defer f.Close()
buf := make([]byte, WALIndexHeaderSize)
if _, err := io.ReadFull(f, buf); err != nil {
return fmt.Errorf("read shm index: %w", err)
}
// Invalidate "isInit" fields.
buf[12], buf[60] = 0, 0
// Rewrite header.
if _, err := f.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek shm index: %w", err)
} else if _, err := f.Write(buf); err != nil {
return fmt.Errorf("overwrite shm index: %w", err)
} else if err := f.Close(); err != nil {
return fmt.Errorf("close shm index: %w", err)
}
// Truncate WAL file again.
var row [3]int
if err := db.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE)`).Scan(&row[0], &row[1], &row[2]); err != nil {
return fmt.Errorf("truncate: %w", err)
}
return nil
}
// A marker error to indicate that a restart checkpoint could not verify
// continuity between WAL indices and a new generation should be started.
var errRestartGeneration = errors.New("restart generation")