diff --git a/cmd/litestreamd/main.go b/cmd/litestreamd/main.go index 81eb2ca..b9b1d29 100644 --- a/cmd/litestreamd/main.go +++ b/cmd/litestreamd/main.go @@ -49,7 +49,7 @@ func main() { } // Notify user that initialization is done. - fmt.Printf("Initialized with %d databases; replication initialized.\n", len(m.DBs)) + fmt.Printf("Initialized with %d databases.\n", len(m.DBs)) // Wait for signal to stop program. <-ctx.Done() diff --git a/db.go b/db.go index aa1cbe2..394d389 100644 --- a/db.go +++ b/db.go @@ -27,15 +27,12 @@ const ( // DB represents a managed instance of a SQLite database in the file system. type DB struct { - mu sync.Mutex + mu sync.RWMutex path string // part to database db *sql.DB // target database rtx *sql.Tx // long running read transaction pageSize int // page size, in bytes - byteOrder binary.ByteOrder // determined by WAL header magic - salt0, salt1 uint32 // read from WAL header - ctx context.Context cancel func() wg sync.WaitGroup @@ -132,9 +129,45 @@ func (db *DB) CurrentShadowWALPath(generation string) (string, error) { } func (db *DB) Open() (err error) { + db.wg.Add(1) + go func() { defer db.wg.Done(); db.monitor() }() + + return nil +} + +// Close releases the read lock & closes the database. This method should only +// be called by tests as it causes the underlying database to be checkpointed. +func (db *DB) Close() (err error) { + if e := db.SoftClose(); e != nil && err == nil { + err = e + } + + if db.db != nil { + if e := db.db.Close(); e != nil && err == nil { + err = e + } + } + return err +} + +// Init initializes the connection to the database. +// Skipped if already initialized or if the database file does not exist. +func (db *DB) Init() (err error) { db.mu.Lock() defer db.mu.Unlock() + // Exit if already initialized. + if db.db != nil { + return nil + } + + // Exit if no database file exists. + if _, err := os.Stat(db.path); os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + // Connect to SQLite database & enable WAL. if db.db, err = sql.Open("sqlite3", db.path); err != nil { return err @@ -142,6 +175,11 @@ func (db *DB) Open() (err error) { return fmt.Errorf("enable wal: %w", err) } + // Disable autocheckpoint. + if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil { + return fmt.Errorf("disable autocheckpoint: %w", err) + } + // Create a table to force writes to the WAL when empty. // There should only ever be one row with id=1. if _, err := db.db.Exec(`CREATE TABLE IF NOT EXISTS _litestream_seq (id INTEGER PRIMARY KEY, seq INTEGER);`); err != nil { @@ -177,9 +215,6 @@ func (db *DB) Open() (err error) { return fmt.Errorf("clean: %w", err) } - db.wg.Add(1) - go func() { defer db.wg.Done(); db.monitor() }() - return nil } @@ -192,7 +227,9 @@ func (db *DB) clean() error { dir := filepath.Join(db.MetaPath(), "generations") fis, err := ioutil.ReadDir(dir) - if err != nil { + if os.IsNotExist(err) { + return nil + } else if err != nil { return err } for _, fi := range fis { @@ -209,21 +246,6 @@ func (db *DB) clean() error { return nil } -// Close releases the read lock & closes the database. This method should only -// be called by tests as it causes the underlying database to be checkpointed. -func (db *DB) Close() (err error) { - if e := db.SoftClose(); e != nil && err == nil { - err = e - } - - if db.db != nil { - if e := db.db.Close(); e != nil && err == nil { - err = e - } - } - return err -} - // SoftClose closes everything but the underlying db connection. This method // is available because the binary needs to avoid closing the database on exit // to prevent autocheckpointing. @@ -251,8 +273,8 @@ func (db *DB) acquireReadLock() error { return err } - // Disable autocheckpointing on this connection. - if _, err := tx.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil { + // Execute read query to obtain read lock. + if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { tx.Rollback() return err } @@ -329,12 +351,23 @@ func (db *DB) createGeneration() (string, error) { } } + // Remove old generations. + if err := db.clean(); err != nil { + return "", err + } + return generation, nil } // Sync copies pending data from the WAL to the shadow WAL. func (db *DB) Sync() (err error) { - // TODO: Lock DB while syncing? + db.mu.RLock() + defer db.mu.RUnlock() + + // No database exists, exit. + if db.db == nil { + return nil + } // TODO: Force "-wal" file if it doesn't exist. @@ -364,11 +397,6 @@ func (db *DB) Sync() (err error) { return fmt.Errorf("_litestream_lock: %w", err) } - // Disable the autocheckpoint. - if _, err := tx.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil { - return fmt.Errorf("disable autocheckpoint: %w", err) - } - // Verify our last sync matches the current state of the WAL. // This ensures that we have an existing generation & that the last sync // position of the real WAL hasn't been overwritten by another process. @@ -385,7 +413,8 @@ func (db *DB) Sync() (err error) { log.Printf("%s: new generation %q, %s", db.path, info.generation, info.reason) // Clear shadow wal info. - info.shadowWALPath, info.shadowWALSize = "", 0 + info.shadowWALPath = db.ShadowWALPath(info.generation, 0) + info.shadowWALSize = WALHeaderSize info.restart = false info.reason = "" } @@ -441,7 +470,7 @@ func (db *DB) verifyWAL() (info syncInfo, err error) { if err != nil { return info, fmt.Errorf("cannot find current generation: %w", err) } else if generation == "" { - info.reason = "no generation" + info.reason = "no generation exists" return info, nil } info.generation = generation @@ -489,14 +518,16 @@ func (db *DB) verifyWAL() (info syncInfo, err error) { } // Verify last page synced still matches. - offset := info.shadowWALSize - int64(db.pageSize+WALFrameHeaderSize) - if buf0, err := readFileAt(db.WALPath(), offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { - return info, fmt.Errorf("cannot read last synced wal page: %w", err) - } else if buf1, err := readFileAt(info.shadowWALPath, offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { - return info, fmt.Errorf("cannot read last synced shadow wal page: %w", err) - } else if !bytes.Equal(buf0, buf1) { - info.reason = "wal overwritten by another process" - return info, nil + if info.shadowWALSize > WALHeaderSize { + offset := info.shadowWALSize - int64(db.pageSize+WALFrameHeaderSize) + if buf0, err := readFileAt(db.WALPath(), offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { + return info, fmt.Errorf("cannot read last synced wal page: %w", err) + } else if buf1, err := readFileAt(info.shadowWALPath, offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { + return info, fmt.Errorf("cannot read last synced shadow wal page: %w", err) + } else if !bytes.Equal(buf0, buf1) { + info.reason = "wal overwritten by another process" + return info, nil + } } return info, nil @@ -549,24 +580,15 @@ func (db *DB) initShadowWALFile(filename string) error { } // Determine byte order for checksumming from header magic. - magic := binary.BigEndian.Uint32(hdr[0:]) - switch magic { - case 0x377f0682: - db.byteOrder = binary.LittleEndian - case 0x377f0683: - db.byteOrder = binary.BigEndian - default: - return fmt.Errorf("invalid wal header magic: %x", magic) + bo, err := headerByteOrder(hdr) + if err != nil { + return err } - // Read header salt. - db.salt0 = binary.BigEndian.Uint32(hdr[16:]) - db.salt1 = binary.BigEndian.Uint32(hdr[20:]) - // Verify checksum. s0 := binary.BigEndian.Uint32(hdr[24:]) s1 := binary.BigEndian.Uint32(hdr[28:]) - if v0, v1 := Checksum(db.byteOrder, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 { + if v0, v1 := Checksum(bo, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 { return fmt.Errorf("invalid header checksum: (%x,%x) != (%x,%x)", v0, v1, s0, s1) } @@ -595,6 +617,19 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { return 0, err } + // Read shadow WAL header to determine byte order for checksum & salt. + hdr := make([]byte, WALHeaderSize) + if _, err := io.ReadFull(w, hdr); err != nil { + return 0, fmt.Errorf("read header: %w", err) + } + hsalt0 := binary.BigEndian.Uint32(hdr[16:]) + hsalt1 := binary.BigEndian.Uint32(hdr[20:]) + + bo, err := headerByteOrder(hdr) + if err != nil { + return 0, err + } + // Read previous checksum. chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize) if err != nil { @@ -624,19 +659,24 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { // Read frame salt & compare to header salt. Stop reading on mismatch. salt0 := binary.BigEndian.Uint32(buf[8:]) salt1 := binary.BigEndian.Uint32(buf[12:]) - if salt0 != db.salt0 || salt1 != db.salt1 { + if salt0 != hsalt0 || salt1 != hsalt1 { break } // Verify checksum of page is valid. fchksum0 := binary.BigEndian.Uint32(buf[16:]) fchksum1 := binary.BigEndian.Uint32(buf[20:]) - chksum0, chksum1 = Checksum(db.byteOrder, chksum0, chksum1, buf[:8]) // frame header - chksum0, chksum1 = Checksum(db.byteOrder, chksum0, chksum1, buf[24:]) // frame data + chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header + chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data if chksum0 != fchksum0 || chksum1 != fchksum1 { return newSize, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", newSize, chksum0, chksum1, fchksum0, fchksum1) } + // Write frame to shadow WAL. + if _, err := w.Write(buf); err != nil { + return newSize, err + } + // Add page to the new size of the shadow WAL. newSize += int64(len(buf)) } @@ -694,9 +734,11 @@ func (db *DB) checkpoint(force bool) error { rawsql = `PRAGMA wal_checkpoint(RESTART);` } - if _, err := db.db.Exec(rawsql); err != nil { + var row [3]int + if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { return err } + log.Printf("%s: checkpoint: force=%v (%d,%d,%d)", db.path, force, row[0], row[1], row[2]) // Reacquire the read lock immediately after the checkpoint. if err := db.acquireReadLock(); err != nil { @@ -714,17 +756,31 @@ func (db *DB) monitor() { // Wait for ticker or context close. select { case <-db.ctx.Done(): + return case <-ticker.C: } + // Ensure the database is initialized. + if err := db.Init(); err != nil { + log.Printf("%s: init error: %s", db.path, err) + continue + } + // Sync the database to the shadow WAL. if err := db.Sync(); err != nil && !errors.Is(err, context.Canceled) { log.Printf("%s: sync error: %s", db.path, err) } - - // If context closed, exit after final sync. - if db.ctx.Err() != nil { - return - } + } +} + +func headerByteOrder(hdr []byte) (binary.ByteOrder, error) { + magic := binary.BigEndian.Uint32(hdr[0:]) + switch magic { + case 0x377f0682: + return binary.LittleEndian, nil + case 0x377f0683: + return binary.BigEndian, nil + default: + return nil, fmt.Errorf("invalid wal header magic: %x", magic) } }