From a8387ed6f9e159c619feb1b7d56fbbd812e7e598 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 19 Dec 2020 08:56:44 -0700 Subject: [PATCH] Add read lock --- README.md | 13 ++ cmd/litestream/config.go | 1 + cmd/litestream/main.go | 8 +- db.go | 344 +++++++++++++++++++++++++++++++++++++-- replicator.go | 44 ++++- 5 files changed, 390 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 56db82e..1e2717f 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,16 @@ litestream ========== Streaming replication for SQLite. + + +## Questions + +- How to avoid WAL checkpointing on close? + + +## Notes + +```sql +-- Disable autocheckpointing. +PRAGMA wal_autocheckpoint = 0 +``` \ No newline at end of file diff --git a/cmd/litestream/config.go b/cmd/litestream/config.go index ed3ea62..8c8108a 100644 --- a/cmd/litestream/config.go +++ b/cmd/litestream/config.go @@ -54,5 +54,6 @@ type DBConfig struct { type ReplicatorConfig struct { Type string `yaml:"type"` // "file", "s3" + Name string `yaml:"name"` // name of replicator, optional. Path string `yaml:"path"` // used for file replicators } diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index af6aafa..a19a81e 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "log" "os" "os/signal" @@ -82,6 +83,9 @@ func (m *Main) ParseFlags(ctx context.Context, args []string) (err error) { return err } + // Initialize log. + log.SetFlags(0) + // Load configuration. if m.ConfigPath == "" { return errors.New("-config required") @@ -145,13 +149,13 @@ func (m *Main) createFileReplicator(db *litestream.DB, config *ReplicatorConfig) if config.Path == "" { return nil, fmt.Errorf("file replicator path require for db %q", db.Path()) } - return litestream.NewFileReplicator(db, config.Path), nil + return litestream.NewFileReplicator(db, config.Name, config.Path), nil } // Close closes all open databases. func (m *Main) Close() (err error) { for _, db := range m.DBs { - if e := db.Close(); e != nil { + if e := db.SoftClose(); e != nil { fmt.Printf("error closing db: path=%s err=%s\n", db.Path(), e) if err == nil { err = e diff --git a/db.go b/db.go index a36d2cd..e533e20 100644 --- a/db.go +++ b/db.go @@ -3,19 +3,29 @@ package litestream import ( "context" "database/sql" + "encoding/hex" + "errors" "fmt" + "io" + "io/ioutil" "log" + "math/rand" "os" "path/filepath" + "strings" "sync" "time" ) +var ErrNoGeneration = errors.New("litestream: no generation") + const ( MetaDirSuffix = "-litestream" WALDirName = "wal" WALExt = ".wal" + + GenerationNameLen = 16 ) const ( @@ -25,8 +35,9 @@ const ( // DB represents a managed instance of a SQLite database in the file system. type DB struct { mu sync.Mutex - path string - db *sql.DB + path string // part to database + db *sql.DB // target database + rtx *sql.Tx // long running read transaction ctx context.Context cancel func() @@ -55,12 +66,60 @@ func (db *DB) Path() string { return db.path } +// WALPath returns the path to the database's WAL file. +func (db *DB) WALPath() string { + return db.path + "-wal" +} + // MetaPath returns the path to the database metadata. func (db *DB) MetaPath() string { dir, file := filepath.Split(db.path) return filepath.Join(dir, "."+file+MetaDirSuffix) } +// GenerationNamePath returns the path of the name of the current generation. +func (db *DB) GenerationNamePath() string { + return filepath.Join(db.MetaPath(), "generation") +} + +// GenerationPath returns the path of a single generation. +func (db *DB) GenerationPath(generation string) string { + return filepath.Join(db.MetaPath(), "generations", generation) +} + +// ShadowWALPath returns the path of a single shadow WAL file. +func (db *DB) ShadowWALPath(generation string, index int) string { + assert(index >= 0, "shadow wal index cannot be negative") + return filepath.Join(db.GenerationPath(generation), "wal", fmt.Sprintf("%016x", index)+".wal") +} + +// CurrentShadowWALPath returns the path to the last shadow WAL in a generation. +func (db *DB) CurrentShadowWALPath(generation string) (string, error) { + // TODO: Cache current shadow WAL path. + dir := filepath.Join(db.GenerationPath(generation), "wal") + fis, err := ioutil.ReadDir(dir) + if err != nil { + return "", err + } + + // Find highest wal file. + var max string + for _, fi := range fis { + if !strings.HasSuffix(fi.Name(), WALExt) { + continue + } + if max == "" || fi.Name() > max { + max = fi.Name() + } + } + + // Return error if we found no WAL files. + if max == "" { + return "", fmt.Errorf("no wal files found in %q", dir) + } + return filepath.Join(dir, max), nil +} + func (db *DB) Open() (err error) { db.mu.Lock() defer db.mu.Unlock() @@ -72,6 +131,17 @@ func (db *DB) Open() (err error) { return fmt.Errorf("enable wal: %w", err) } + // Create a lock table to force write locks during sync. + if _, err := db.db.Exec(`CREATE TABLE IF NOT EXISTS _litestream_lock (id INTEGER);`); err != nil { + return fmt.Errorf("enable wal: %w", err) + } + + // Start a long-running read transaction to prevent other transactions + // from checkpointing. + if err := db.acquireReadLock(); err != nil { + return fmt.Errorf("acquire read lock: %w", err) + } + // Ensure meta directory structure exists. if err := os.MkdirAll(db.MetaPath(), 0700); err != nil { return err @@ -83,30 +153,278 @@ func (db *DB) Open() (err error) { return nil } -// Close disconnects from the database. +// Close releases the read lock & closes the database. This method should only +// be called by tests as it causes the underlying database to be checkpointed. func (db *DB) Close() (err error) { - db.cancel() - db.wg.Wait() + if e := db.SoftClose(); e != nil && err == nil { + err = e + } if db.db != nil { - err = db.db.Close() + if e := db.db.Close(); e != nil && err == nil { + err = e + } } return err } +// SoftClose closes everything but the underlying db connection. This method +// is available because the binary needs to avoid closing the database on exit +// to prevent autocheckpointing. +func (db *DB) SoftClose() (err error) { + db.cancel() + db.wg.Wait() + + if db.rtx != nil { + if e := db.releaseReadLock(); e != nil && err == nil { + err = e + } + } + return err +} + +// acquireReadLock begins a read transaction on the database to prevent checkpointing. +func (db *DB) acquireReadLock() error { + if db.rtx != nil { + return nil + } + + // Start long running read-transaction to prevent checkpoints. + tx, err := db.db.Begin() + if err != nil { + return err + } + + // Disable autocheckpointing on this connection. + if _, err := tx.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil { + tx.Rollback() + return err + } + + // Track transaction so we can release it later before checkpoint. + db.rtx = tx + return nil +} + +// releaseReadLock rolls back the long-running read transaction. +func (db *DB) releaseReadLock() error { + // Ignore if we do not have a read lock. + if db.rtx == nil { + return nil + } + + // Rollback & clear read transaction. + err := db.rtx.Rollback() + db.rtx = nil + return err +} + +// CurrentGeneration returns the name of the generation saved to the "generation" +// file in the meta data directory. Returns ErrNoGeneration if none exists. +func (db *DB) CurrentGeneration() (string, error) { + buf, err := ioutil.ReadFile(db.GenerationNamePath()) + if os.IsNotExist(err) { + return "", ErrNoGeneration + } else if err != nil { + return "", err + } + + // TODO: Verify if generation directory exists. If not, delete. + + generation := strings.TrimSpace(string(buf)) + if len(generation) != GenerationNameLen { + return "", ErrNoGeneration + } + return generation, nil +} + +// createGeneration starts a new generation by creating the generation +// directory, snapshotting to each replicator, and updating the current +// generation name. +func (db *DB) createGeneration() (string, error) { + // Generate random generation hex name. + buf := make([]byte, GenerationNameLen/2) + _, _ = rand.New(rand.NewSource(time.Now().UnixNano())).Read(buf) + generation := hex.EncodeToString(buf) + + // Generate new directory. + dir := filepath.Join(db.MetaPath(), "generations", generation) + if err := os.MkdirAll(dir, 0700); err != nil { + return "", err + } + + // Copy to shadow WAL. + if err := db.copyInitialWAL(generation); err != nil { + return "", fmt.Errorf("copy initial wal: %w", err) + } + + // Atomically write generation name as current generation. + generationNamePath := db.GenerationNamePath() + if err := ioutil.WriteFile(generationNamePath+".tmp", []byte(generation+"\n"), 0600); err != nil { + return "", fmt.Errorf("write generation temp file: %w", err) + } else if err := os.Rename(generationNamePath+".tmp", generationNamePath); err != nil { + return "", fmt.Errorf("rename generation file: %w", err) + } + + // Issue snapshot by each replicator. + for _, r := range db.Replicators { + if err := r.BeginSnapshot(db.ctx); err != nil { + return "", fmt.Errorf("cannot snapshot %q replicator: %s", r.Name(), err) + } + } + + return generation, nil +} + +// copyInitialWAL copies the full WAL file to the initial shadow WAL path. +func (db *DB) copyInitialWAL(generation string) error { + shadowWALPath := db.ShadowWALPath(generation, 0) + if err := os.MkdirAll(filepath.Dir(shadowWALPath), 0700); err != nil { + return err + } + + // Open the initial shadow WAL file for writing. + w, err := os.OpenFile(shadowWALPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + defer w.Close() + + // Open the database's WAL file for reading. + r, err := os.Open(db.WALPath()) + if err != nil { + return err + } + defer r.Close() + + // Copy & sync. + if _, err := io.Copy(w, r); err != nil { + return err + } else if err := w.Sync(); err != nil { + return err + } else if err := w.Close(); err != nil { + return err + } + return nil +} + // Sync copies pending data from the WAL to the shadow WAL. -func (db *DB) Sync() error { - // TODO: Obtain write lock on database. - // TODO: Start new generation if no generations exist. - // TODO: Fetch latest generation. - // TODO: Compare header on shadow WAL with real WAL. On mismatch, start new generation. - // TODO: Copy pending data from real WAL to shadow WAL. +func (db *DB) Sync() (err error) { + // TODO: Lock DB while syncing? + + // Start a transaction. This will be promoted immediately after. + tx, err := db.db.Begin() + if err != nil { + return fmt.Errorf("begin: %w", err) + } + + // Ensure write transaction rolls back before returning. + defer func() { + if e := tx.Rollback(); e != nil && err == nil { + err = e + } + }() + + // Insert into the lock table to promote to a write tx. The lock table + // insert will never actually occur because our tx will be rolled back, + // however, it will ensure our tx grabs the write lock. Unfortunately, + // we can't call "BEGIN IMMEDIATE" as we are already in a transaction. + if _, err := tx.ExecContext(db.ctx, `INSERT INTO _litestream_lock (id) VALUES (1);`); err != nil { + return fmt.Errorf("_litestream_lock: %w", err) + } + + // Disable the autocheckpoint. + if _, err := tx.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil { + return fmt.Errorf("disable autocheckpoint: %w", err) + } + + // Look up existing generation or start a new one. + generation, err := db.CurrentGeneration() + if err == ErrNoGeneration { + if generation, err = db.createGeneration(); err != nil { + return fmt.Errorf("create generation: %w", err) + } + } else if err != nil { + return fmt.Errorf("cannot find current generation: %w", err) + } + + // Synchronize real WAL with current shadow WAL. + if err := db.syncWAL(generation); err != nil { + return fmt.Errorf("sync wal: %w", err) + } // TODO: If WAL size is greater than min threshold, attempt checkpoint: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint // TODO: If WAL size is great than max threshold, force checkpoint. // TODO: Release write lock on database. // TODO: On checkpoint, write new page and start new shadow WAL. + + return nil +} + +// syncWAL copies pending bytes from the real WAL to the shadow WAL. +func (db *DB) syncWAL(generation string) error { + // Determine total bytes of real WAL. + fi, err := os.Stat(db.WALPath()) + if err != nil { + return err + } + walSize := fi.Size() + + // Open shadow WAL to copy append to. + shadowWALPath, err := db.CurrentShadowWALPath(generation) + if err != nil { + return fmt.Errorf("cannot determine shadow WAL: %w", err) + } + + // TODO: Compare WAL headers. + + // Determine shadow WAL current size. + fi, err = os.Stat(shadowWALPath) + if err != nil { + return err + } + shadowWALSize := fi.Size() + + // Ensure we have pending bytes to write. + // TODO: Verify pending bytes is divisble by (pageSize+headerSize)? + pendingN := walSize - shadowWALSize + if pendingN < 0 { + panic("shadow wal larger than real wal") // TODO: Handle gracefully + } else if pendingN == 0 { + return nil // wals match, exit + } + + // TODO: Verify last page copied matches. + + // Open handles for the shadow WAL & real WAL. + w, err := os.OpenFile(shadowWALPath, os.O_RDWR, 0600) + if err != nil { + return err + } + defer w.Close() + + r, err := os.Open(db.WALPath()) + if err != nil { + return err + } + defer r.Close() + + // Seek to the correct position for each file. + if _, err := r.Seek(shadowWALSize, io.SeekStart); err != nil { + return fmt.Errorf("wal seek: %w", err) + } else if _, err := w.Seek(shadowWALSize, io.SeekStart); err != nil { + return fmt.Errorf("shadow wal seek: %w", err) + } + + // Copy and sync. + if _, err := io.CopyN(w, r, pendingN); err != nil { + return fmt.Errorf("copy shadow wal error: %w", err) + } else if err := w.Sync(); err != nil { + return fmt.Errorf("shadow wal sync: %w", err) + } else if err := w.Close(); err != nil { + return fmt.Errorf("shadow wal close: %w", err) + } return nil } @@ -123,7 +441,7 @@ func (db *DB) monitor() { } // Sync the database to the shadow WAL. - if err := db.Sync(); err != nil { + if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) { log.Printf("sync error: path=%s err=%s", db.path, err) } diff --git a/replicator.go b/replicator.go index 36d382c..2656557 100644 --- a/replicator.go +++ b/replicator.go @@ -1,18 +1,52 @@ package litestream +import ( + "context" +) + +// Replicator represents a method for replicating the snapshot & WAL data to +// a remote destination. type Replicator interface { + Name() string + Type() string + BeginSnapshot(ctx context.Context) error } +var _ Replicator = (*FileReplicator)(nil) + // FileReplicator is a replicator that replicates a DB to a local file path. type FileReplicator struct { - db *DB // source database - dst string // destination path + db *DB // source database + name string // replicator name, optional + dst string // destination path } // NewFileReplicator returns a new instance of FileReplicator. -func NewFileReplicator(db *DB, dst string) *FileReplicator { +func NewFileReplicator(db *DB, name, dst string) *FileReplicator { return &FileReplicator{ - db: db, - dst: dst, + db: db, + name: name, + dst: dst, } } + +// Name returns the name of the replicator. Returns the type if no name set. +func (r *FileReplicator) Name() string { + if r.name != "" { + return r.name + } + return r.Type() +} + +// Type returns the type of replicator. +func (r *FileReplicator) Type() string { + return "file" +} + +// +func (r *FileReplicator) BeginSnapshot(ctx context.Context) error { + // TODO: Set snapshotting state to true. + // TODO: Read current generation. + // TODO: Copy database to destination. + return nil +}