Basic file replication working with WAL

This commit is contained in:
Ben Johnson
2020-12-24 13:23:52 -07:00
parent 8d7f5b28a9
commit 341eac268c
3 changed files with 417 additions and 27 deletions

185
db.go
View File

@@ -33,6 +33,8 @@ type DB struct {
rtx *sql.Tx // long running read transaction rtx *sql.Tx // long running read transaction
pageSize int // page size, in bytes pageSize int // page size, in bytes
notify chan struct{} // closes on WAL change
ctx context.Context ctx context.Context
cancel func() cancel func()
wg sync.WaitGroup wg sync.WaitGroup
@@ -62,6 +64,8 @@ type DB struct {
func NewDB(path string) *DB { func NewDB(path string) *DB {
db := &DB{ db := &DB{
path: path, path: path,
notify: make(chan struct{}),
MinCheckpointPageN: DefaultMinCheckpointPageN, MinCheckpointPageN: DefaultMinCheckpointPageN,
MonitorInterval: DefaultMonitorInterval, MonitorInterval: DefaultMonitorInterval,
} }
@@ -128,7 +132,23 @@ func (db *DB) CurrentShadowWALPath(generation string) (string, error) {
return filepath.Join(dir, max), nil return filepath.Join(dir, max), nil
} }
// Notify returns a channel that closes when the shadow WAL changes.
func (db *DB) Notify() <-chan struct{} {
db.mu.RLock()
defer db.mu.RUnlock()
return db.notify
}
// PageSize returns the page size of the underlying database.
// Only valid after database exists & Init() has successfully run.
func (db *DB) PageSize() int {
db.mu.RLock()
defer db.mu.RUnlock()
return db.pageSize
}
func (db *DB) Open() (err error) { func (db *DB) Open() (err error) {
// Start monitoring SQLite database in a separate goroutine.
db.wg.Add(1) db.wg.Add(1)
go func() { defer db.wg.Done(); db.monitor() }() go func() { defer db.wg.Done(); db.monitor() }()
@@ -215,6 +235,11 @@ func (db *DB) Init() (err error) {
return fmt.Errorf("clean: %w", err) return fmt.Errorf("clean: %w", err)
} }
// Start replication.
for _, r := range db.Replicators {
r.Start(db.ctx)
}
return nil return nil
} }
@@ -253,6 +278,11 @@ func (db *DB) SoftClose() (err error) {
db.cancel() db.cancel()
db.wg.Wait() db.wg.Wait()
// Ensure replicators all stop replicating.
for _, r := range db.Replicators {
r.Stop()
}
if db.rtx != nil { if db.rtx != nil {
if e := db.releaseReadLock(); e != nil && err == nil { if e := db.releaseReadLock(); e != nil && err == nil {
err = e err = e
@@ -344,13 +374,6 @@ func (db *DB) createGeneration() (string, error) {
return "", fmt.Errorf("rename generation file: %w", err) return "", fmt.Errorf("rename generation file: %w", err)
} }
// Issue snapshot by each replicator.
for _, r := range db.Replicators {
if err := r.BeginSnapshot(db.ctx); err != nil {
return "", fmt.Errorf("cannot snapshot %q replicator: %s", r.Name(), err)
}
}
// Remove old generations. // Remove old generations.
if err := db.clean(); err != nil { if err := db.clean(); err != nil {
return "", err return "", err
@@ -361,8 +384,8 @@ func (db *DB) createGeneration() (string, error) {
// Sync copies pending data from the WAL to the shadow WAL. // Sync copies pending data from the WAL to the shadow WAL.
func (db *DB) Sync() (err error) { func (db *DB) Sync() (err error) {
db.mu.RLock() db.mu.Lock()
defer db.mu.RUnlock() defer db.mu.Unlock()
// No database exists, exit. // No database exists, exit.
if db.db == nil { if db.db == nil {
@@ -400,12 +423,16 @@ func (db *DB) Sync() (err error) {
// Verify our last sync matches the current state of the WAL. // Verify our last sync matches the current state of the WAL.
// This ensures that we have an existing generation & that the last sync // This ensures that we have an existing generation & that the last sync
// position of the real WAL hasn't been overwritten by another process. // position of the real WAL hasn't been overwritten by another process.
//
// If we are unable to verify the WAL state then we start a new generation.
info, err := db.verify() info, err := db.verify()
if err != nil { if err != nil {
return fmt.Errorf("cannot verify wal state: %w", err) return fmt.Errorf("cannot verify wal state: %w", err)
} else if info.reason != "" { }
// Track if anything in the shadow WAL changes and then notify at the end.
changed := info.walSize != info.shadowWALSize || info.restart || info.reason != ""
// If we are unable to verify the WAL state then we start a new generation.
if info.reason != "" {
// Start new generation & notify user via log message. // Start new generation & notify user via log message.
if info.generation, err = db.createGeneration(); err != nil { if info.generation, err = db.createGeneration(); err != nil {
return fmt.Errorf("create generation: %w", err) return fmt.Errorf("create generation: %w", err)
@@ -417,6 +444,7 @@ func (db *DB) Sync() (err error) {
info.shadowWALSize = WALHeaderSize info.shadowWALSize = WALHeaderSize
info.restart = false info.restart = false
info.reason = "" info.reason = ""
} }
// Synchronize real WAL with current shadow WAL. // Synchronize real WAL with current shadow WAL.
@@ -441,11 +469,19 @@ func (db *DB) Sync() (err error) {
// Issue the checkpoint. // Issue the checkpoint.
if checkpoint { if checkpoint {
changed = true
if err := db.checkpoint(info, forceCheckpoint); err != nil { if err := db.checkpoint(info, forceCheckpoint); err != nil {
return fmt.Errorf("checkpoint: force=%v err=%w", err) return fmt.Errorf("checkpoint: force=%v err=%w", err)
} }
} }
// Notify replicators of WAL changes.
if changed {
close(db.notify)
db.notify = make(chan struct{})
}
return nil return nil
} }
@@ -496,15 +532,12 @@ func (db *DB) verify() (info syncInfo, err error) {
if err != nil { if err != nil {
return info, err return info, err
} }
info.shadowWALSize = fi.Size() info.shadowWALSize = frameAlign(fi.Size(), db.pageSize)
// Truncate shadow WAL if there is a partial page. // Truncate shadow WAL if there is a partial page.
// Exit if shadow WAL does not contain a full header. // Exit if shadow WAL does not contain a full header.
frameSize := int64(db.pageSize) + WALFrameHeaderSize
if info.shadowWALSize < WALHeaderSize { if info.shadowWALSize < WALHeaderSize {
return info, fmt.Errorf("short shadow wal: %s", info.shadowWALPath) return info, fmt.Errorf("short shadow wal: %s", info.shadowWALPath)
} else if (info.shadowWALSize-WALHeaderSize)%frameSize != 0 {
info.shadowWALSize = ((info.shadowWALSize - WALHeaderSize) / frameSize) + WALHeaderSize
} }
// If shadow WAL is larger than real WAL then the WAL has been truncated // If shadow WAL is larger than real WAL then the WAL has been truncated
@@ -523,7 +556,12 @@ func (db *DB) verify() (info syncInfo, err error) {
info.restart = !bytes.Equal(hdr0, hdr1) info.restart = !bytes.Equal(hdr0, hdr1)
} }
// TODO: Handle checkpoint sequence number rollover. // If we only have a header then ensure header matches.
// Otherwise we need to start a new generation.
if info.shadowWALSize == WALHeaderSize && info.restart {
info.reason = "wal header only, mismatched"
return info, nil
}
// Verify last page synced still matches. // Verify last page synced still matches.
if info.shadowWALSize > WALHeaderSize { if info.shadowWALSize > WALHeaderSize {
@@ -699,6 +737,114 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
return newSize, nil return newSize, nil
} }
// WALReader opens a reader for a shadow WAL file at a given position.
// If the reader is at the end of the file, it attempts to return the next file.
//
// The caller should check Pos() & Size() on the returned reader to check offset.
func (db *DB) WALReader(pos Pos) (r *WALReader, err error) {
// Fetch reader for the requested position. Return if it has data.
r, err = db.walReader(pos)
if err != nil {
return nil, err
} else if r.N() > 0 {
return r, nil
}
// Otherwise attempt to read the start of the next WAL file.
pos.Index, pos.Offset = pos.Index+1, 0
r, err = db.walReader(pos)
if os.IsNotExist(err) {
return nil, io.EOF
}
return r, err
}
// walReader opens a file reader for a shadow WAL file at a given position.
func (db *DB) walReader(pos Pos) (r *WALReader, err error) {
filename := db.ShadowWALPath(pos.Generation, pos.Index)
f, err := os.Open(filename)
if err != nil {
return nil, err
}
// Ensure file is closed if any error occurs.
defer func() {
if err != nil {
r.Close()
}
}()
// Fetch frame-aligned file size and ensure requested offset is not past EOF.
fi, err := f.Stat()
if err != nil {
return nil, err
}
fileSize := frameAlign(fi.Size(), db.pageSize)
if pos.Offset > fileSize {
return nil, fmt.Errorf("wal reader offset too high: %d > %d", pos.Offset, fi.Size())
}
// Move file handle to offset position.
if _, err := f.Seek(pos.Offset, io.SeekStart); err != nil {
return nil, err
}
return &WALReader{
f: f,
n: fileSize - pos.Offset,
pos: pos,
}, nil
}
// frameAlign returns a frame-aligned offset.
// Returns zero if offset is less than the WAL header size.
func frameAlign(offset int64, pageSize int) int64 {
assert(offset >= 0, "frameAlign(): offset must be non-negative")
assert(pageSize >= 0, "frameAlign(): page size must be non-negative")
if offset < WALHeaderSize {
return 0
}
frameSize := WALFrameHeaderSize + int64(pageSize)
frameN := (offset - WALHeaderSize) / frameSize
return (frameN * frameSize) + WALHeaderSize
}
// WALReader represents a reader for a WAL file that tracks WAL position.
type WALReader struct {
f *os.File
n int64
pos Pos
}
// Close closes the underlying WAL file handle.
func (r *WALReader) Close() error { return r.f.Close() }
// N returns the remaining bytes in the reader.
func (r *WALReader) N() int64 { return r.n }
// Pos returns the current WAL position.
func (r *WALReader) Pos() Pos { return r.pos }
// Read reads bytes into p, updates the position, and returns the bytes read.
// Returns io.EOF at the end of the available section of the WAL.
func (r *WALReader) Read(p []byte) (n int, err error) {
if r.n <= 0 {
return 0, io.EOF
}
if int64(len(p)) > r.n {
p = p[0:r.n]
}
n, err = r.f.Read(p)
r.n -= int64(n)
r.pos.Offset += int64(n)
return n, err
}
const WALHeaderChecksumOffset = 24 const WALHeaderChecksumOffset = 24
const WALFrameHeaderChecksumOffset = 16 const WALFrameHeaderChecksumOffset = 16
@@ -770,6 +916,11 @@ func (db *DB) checkpoint(info syncInfo, force bool) error {
return nil return nil
} }
// Copy the end of the previous WAL before starting a new shadow WAL.
if _, err := db.copyToShadowWAL(info.shadowWALPath); err != nil {
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
}
// Parse index of current shadow WAL file. // Parse index of current shadow WAL file.
dir, base := filepath.Split(info.shadowWALPath) dir, base := filepath.Split(info.shadowWALPath)
index, err := ParseWALFilename(base) index, err := ParseWALFilename(base)

View File

@@ -22,6 +22,26 @@ const (
GenerationNameLen = 16 GenerationNameLen = 16
) )
// Pos is a position in the WAL for a generation.
type Pos struct {
Generation string // generation name
Index int // wal file index
Offset int64 // offset within wal file
}
// String returns a string representation.
func (p Pos) String() string {
if p.IsZero() {
return "<>"
}
return fmt.Sprintf("<%s,%d,%d>", p.Generation, p.Index, p.Offset)
}
// IsZero returns true if p is the zero value.
func (p Pos) IsZero() bool {
return p == (Pos{})
}
// Checksum computes a running SQLite checksum over a byte slice. // Checksum computes a running SQLite checksum over a byte slice.
func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32) { func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32) {
assert(len(b)%8 == 0, "misaligned checksum byte slice") assert(len(b)%8 == 0, "misaligned checksum byte slice")

View File

@@ -2,6 +2,14 @@ package litestream
import ( import (
"context" "context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
) )
// Replicator represents a method for replicating the snapshot & WAL data to // Replicator represents a method for replicating the snapshot & WAL data to
@@ -9,7 +17,9 @@ import (
type Replicator interface { type Replicator interface {
Name() string Name() string
Type() string Type() string
BeginSnapshot(ctx context.Context) error Snapshotting() bool
Start(ctx context.Context)
Stop()
} }
var _ Replicator = (*FileReplicator)(nil) var _ Replicator = (*FileReplicator)(nil)
@@ -19,6 +29,13 @@ type FileReplicator struct {
db *DB // source database db *DB // source database
name string // replicator name, optional name string // replicator name, optional
dst string // destination path dst string // destination path
mu sync.RWMutex
wg sync.WaitGroup
snapshotting bool // if true, currently copying database
ctx context.Context
cancel func()
} }
// NewFileReplicator returns a new instance of FileReplicator. // NewFileReplicator returns a new instance of FileReplicator.
@@ -27,6 +44,7 @@ func NewFileReplicator(db *DB, name, dst string) *FileReplicator {
db: db, db: db,
name: name, name: name,
dst: dst, dst: dst,
cancel: func() {},
} }
} }
@@ -43,10 +61,211 @@ func (r *FileReplicator) Type() string {
return "file" return "file"
} }
// // SnapshotPath returns the path to a snapshot file.
func (r *FileReplicator) BeginSnapshot(ctx context.Context) error { func (r *FileReplicator) SnapshotPath(generation string, index int) string {
// TODO: Set snapshotting state to true. return filepath.Join(r.dst, "generations", generation, "snapshots", fmt.Sprintf("%016x.snapshot", index))
// TODO: Read current generation. }
// TODO: Copy database to destination.
// WALPath returns the path to a WAL file.
func (r *FileReplicator) WALPath(pos Pos) string {
return filepath.Join(r.dst, "generations", pos.Generation, "wal", fmt.Sprintf("%016x.wal", pos.Index))
}
// Snapshotting returns true if replicator is current snapshotting.
func (r *FileReplicator) Snapshotting() bool {
r.mu.RLock()
defer r.mu.RLock()
return r.snapshotting
}
// Start starts replication for a given generation.
func (r *FileReplicator) Start(ctx context.Context) {
// Stop previous replication.
r.Stop()
r.mu.Lock()
defer r.mu.Unlock()
// Set snapshotting state.
r.snapshotting = true
// Wrap context with cancelation.
ctx, r.cancel = context.WithCancel(ctx)
// Start goroutine to replicate data.
r.wg.Add(1)
go func() { defer r.wg.Done(); r.monitor(ctx) }()
}
// Stop cancels any outstanding replication and blocks until finished.
func (r *FileReplicator) Stop() {
r.cancel()
r.wg.Wait()
}
// monitor runs in a separate goroutine and continuously replicates the DB.
func (r *FileReplicator) monitor(ctx context.Context) {
// Continuously check for new data to replicate.
ch := make(chan struct{})
close(ch)
var notify <-chan struct{} = ch
var pos Pos
var err error
for {
select {
case <-ctx.Done():
return
case <-notify:
}
// Fetch new notify channel before replicating data.
notify = r.db.Notify()
// Determine position, if necessary.
if pos.IsZero() {
if pos, err = r.pos(); err != nil {
log.Printf("%s(%s): cannot determine position: %w", r.db.Path(), r.Name(), err)
continue
} else if pos.IsZero() {
log.Printf("%s(%s): no generation, waiting for data", r.db.Path(), r.Name())
continue
}
}
// Synchronize the shadow wal into the replication directory.
if pos, err = r.sync(ctx, pos); err != nil {
log.Printf("%s(%s): sync error: %w", r.db.Path(), r.Name(), err)
continue
}
}
}
// pos returns the position for the replicator for the current generation.
// Returns a zero value if there is no active generation.
func (r *FileReplicator) pos() (pos Pos, err error) {
// Find the current generation from the DB. Return zero pos if no generation.
generation, err := r.db.CurrentGeneration()
if err != nil {
return pos, err
} else if generation == "" {
return pos, nil // empty position
}
pos.Generation = generation
// Find the max WAL file.
walDir := filepath.Join(r.dst, "generations", generation, "wal")
fis, err := ioutil.ReadDir(walDir)
if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at beginning of generation
} else if err != nil {
return pos, err
}
index := -1
for _, fi := range fis {
if !strings.HasSuffix(fi.Name(), ".wal") {
continue
}
if v, err := ParseWALFilename(filepath.Base(fi.Name())); err != nil {
continue // invalid wal filename
} else if index == -1 || v > index {
index = v
}
}
if index == -1 {
return pos, nil // wal directory exists but no wal files, return beginning pos
}
pos.Index = index
// Determine current offset.
fi, err := os.Stat(filepath.Join(walDir, FormatWALFilename(pos.Index)))
if err != nil {
return pos, err
}
pos.Offset = fi.Size()
return pos, nil
}
// snapshot copies the entire database to the replica path.
func (r *FileReplicator) snapshot(ctx context.Context, pos Pos) error {
rd, err := os.Open(r.db.Path())
if err != nil {
return err
}
defer rd.Close()
snapshotPath := r.SnapshotPath(pos.Generation, pos.Index)
if err := os.MkdirAll(snapshotPath, 0700); err != nil {
return err
}
w, err := os.Create(snapshotPath)
if err != nil {
return err
}
defer w.Close()
if _, err := io.Copy(w, rd); err != nil {
return err
} else if err := w.Sync(); err != nil {
return err
} else if err := w.Close(); err != nil {
return err
}
r.mu.Lock()
r.snapshotting = false
r.mu.Unlock()
return nil return nil
} }
func (r *FileReplicator) sync(ctx context.Context, pos Pos) (_ Pos, err error) {
for {
if pos, err = r.syncNext(ctx, pos); err == io.EOF {
return pos, nil
} else if err != nil {
return pos, err
}
}
}
func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) {
rd, err := r.db.WALReader(pos)
if err == io.EOF {
return pos, err
} else if err != nil {
return pos, fmt.Errorf("wal reader: %w", err)
}
defer rd.Close()
// Ensure parent directory exists for WAL file.
filename := r.WALPath(rd.Pos())
if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil {
return pos, err
}
// Create a temporary file to write into so we don't have partial writes.
w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return pos, err
}
defer w.Close()
// Seek, copy & sync WAL contents.
if _, err := w.Seek(rd.Pos().Offset, io.SeekStart); err != nil {
return pos, err
} else if _, err := io.Copy(w, rd); err != nil {
return pos, err
} else if err := w.Sync(); err != nil {
return pos, err
} else if err := w.Close(); err != nil {
return pos, err
}
// Return ending position of the reader.
return rd.Pos(), nil
}