Remove streaming replication implementation
This commit is contained in:
404
db.go
404
db.go
@@ -54,9 +54,6 @@ type DB struct {
|
||||
pageSize int // page size, in bytes
|
||||
notifyCh chan struct{} // notifies DB of changes
|
||||
|
||||
// Iterators used to stream new WAL changes to replicas
|
||||
itrs map[*FileWALSegmentIterator]struct{}
|
||||
|
||||
// Cached salt & checksum from current shadow header.
|
||||
hdr []byte
|
||||
frame []byte
|
||||
@@ -85,11 +82,6 @@ type DB struct {
|
||||
checkpointErrorNCounterVec *prometheus.CounterVec
|
||||
checkpointSecondsCounterVec *prometheus.CounterVec
|
||||
|
||||
// Client used to receive live, upstream changes. If specified, then
|
||||
// DB should be used as read-only as local changes will conflict with
|
||||
// upstream changes.
|
||||
StreamClient StreamClient
|
||||
|
||||
// Minimum threshold of WAL size, in pages, before a passive checkpoint.
|
||||
// A passive checkpoint will attempt a checkpoint but fail if there are
|
||||
// active transactions occurring at the same time.
|
||||
@@ -130,8 +122,6 @@ func NewDB(path string) *DB {
|
||||
path: path,
|
||||
notifyCh: make(chan struct{}, 1),
|
||||
|
||||
itrs: make(map[*FileWALSegmentIterator]struct{}),
|
||||
|
||||
MinCheckpointPageN: DefaultMinCheckpointPageN,
|
||||
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
||||
ShadowRetentionN: DefaultShadowRetentionN,
|
||||
@@ -275,7 +265,7 @@ func (db *DB) invalidatePos(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Iterate over all segments to find the last one.
|
||||
itr, err := db.walSegments(context.Background(), generation, false)
|
||||
itr, err := db.walSegments(context.Background(), generation)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -422,9 +412,7 @@ func (db *DB) Open() (err error) {
|
||||
return fmt.Errorf("cannot remove tmp files: %w", err)
|
||||
}
|
||||
|
||||
// If an upstream client is specified, then we should simply stream changes
|
||||
// into the database. If it is not specified, then we should monitor the
|
||||
// database for local changes and replicate them out.
|
||||
// Continually monitor local changes in a separate goroutine.
|
||||
db.g.Go(func() error { return db.monitor(db.ctx) })
|
||||
|
||||
return nil
|
||||
@@ -466,14 +454,6 @@ func (db *DB) Close() (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove all iterators.
|
||||
db.mu.Lock()
|
||||
for itr := range db.itrs {
|
||||
itr.SetErr(ErrDBClosed)
|
||||
delete(db.itrs, itr)
|
||||
}
|
||||
db.mu.Unlock()
|
||||
|
||||
// Release the read lock to allow other applications to handle checkpointing.
|
||||
if db.rtx != nil {
|
||||
if e := db.releaseReadLock(); e != nil && err == nil {
|
||||
@@ -645,74 +625,6 @@ func (db *DB) init() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// initReplica initializes a new database file as a replica of an upstream database.
|
||||
func (db *DB) initReplica(pageSize int) (err error) {
|
||||
// Exit if already initialized.
|
||||
if db.db != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Obtain permissions for parent directory.
|
||||
fi, err := os.Stat(filepath.Dir(db.path))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.dirMode = fi.Mode()
|
||||
|
||||
dsn := db.path
|
||||
dsn += fmt.Sprintf("?_busy_timeout=%d", BusyTimeout.Milliseconds())
|
||||
|
||||
// Connect to SQLite database. Use the driver registered with a hook to
|
||||
// prevent WAL files from being removed.
|
||||
if db.db, err = sql.Open("litestream-sqlite3", dsn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize database file if it doesn't exist. It doesn't matter what we
|
||||
// store in it as it will be erased by the replication. We just need to
|
||||
// ensure a WAL file is created and there is at least a page in the database.
|
||||
if _, err := os.Stat(db.path); os.IsNotExist(err) {
|
||||
if _, err := db.db.ExecContext(db.ctx, fmt.Sprintf(`PRAGMA page_size = %d`, pageSize)); err != nil {
|
||||
return fmt.Errorf("set page size: %w", err)
|
||||
}
|
||||
|
||||
var mode string
|
||||
if err := db.db.QueryRow(`PRAGMA journal_mode = wal`).Scan(&mode); err != nil {
|
||||
return err
|
||||
} else if mode != "wal" {
|
||||
return fmt.Errorf("enable wal failed, mode=%q", mode)
|
||||
}
|
||||
|
||||
if _, err := db.db.ExecContext(db.ctx, `CREATE TABLE IF NOT EXISTS _litestream (id INTEGER)`); err != nil {
|
||||
return fmt.Errorf("create _litestream table: %w", err)
|
||||
} else if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
||||
return fmt.Errorf("create _litestream table: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Obtain file info once we know the database exists.
|
||||
fi, err = os.Stat(db.path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init file stat: %w", err)
|
||||
}
|
||||
db.fileMode = fi.Mode()
|
||||
db.uid, db.gid = internal.Fileinfo(fi)
|
||||
|
||||
// Verify page size matches.
|
||||
if err := db.db.QueryRowContext(db.ctx, `PRAGMA page_size;`).Scan(&db.pageSize); err != nil {
|
||||
return fmt.Errorf("read page size: %w", err)
|
||||
} else if db.pageSize != pageSize {
|
||||
return fmt.Errorf("page size mismatch: %d <> %d", db.pageSize, pageSize)
|
||||
}
|
||||
|
||||
// Ensure meta directory structure exists.
|
||||
if err := internal.MkdirAll(db.MetaPath(), db.dirMode, db.uid, db.gid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) clearGeneration(ctx context.Context) error {
|
||||
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
@@ -927,11 +839,6 @@ func (db *DB) createGeneration(ctx context.Context) (string, error) {
|
||||
|
||||
// Sync copies pending data from the WAL to the shadow WAL.
|
||||
func (db *DB) Sync(ctx context.Context) error {
|
||||
if db.StreamClient != nil {
|
||||
db.Logger.Printf("using upstream client, skipping sync")
|
||||
return nil
|
||||
}
|
||||
|
||||
const retryN = 5
|
||||
|
||||
for i := 0; i < retryN; i++ {
|
||||
@@ -1386,8 +1293,7 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
n, err := io.Copy(f, rd)
|
||||
if err != nil {
|
||||
if _, err := io.Copy(f, rd); err != nil {
|
||||
return err
|
||||
} else if err := f.Sync(); err != nil {
|
||||
return err
|
||||
@@ -1405,50 +1311,9 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error
|
||||
return fmt.Errorf("write position file: %w", err)
|
||||
}
|
||||
|
||||
// Generate
|
||||
info := WALSegmentInfo{
|
||||
Generation: pos.Generation,
|
||||
Index: pos.Index,
|
||||
Offset: pos.Offset,
|
||||
Size: n,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Notify all managed segment iterators.
|
||||
for itr := range db.itrs {
|
||||
// Notify iterators of generation change.
|
||||
if itr.Generation() != pos.Generation {
|
||||
itr.SetErr(ErrGenerationChanged)
|
||||
delete(db.itrs, itr)
|
||||
continue
|
||||
}
|
||||
|
||||
// Attempt to append segment to end of iterator.
|
||||
// On error, mark it on the iterator and remove from future notifications.
|
||||
if err := itr.Append(info); err != nil {
|
||||
itr.SetErr(fmt.Errorf("cannot append wal segment: %w", err))
|
||||
delete(db.itrs, itr)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// readPositionFile reads the position from the position file.
|
||||
func (db *DB) readPositionFile() (Pos, error) {
|
||||
buf, err := os.ReadFile(db.PositionPath())
|
||||
if os.IsNotExist(err) {
|
||||
return Pos{}, nil
|
||||
} else if err != nil {
|
||||
return Pos{}, err
|
||||
}
|
||||
|
||||
// Treat invalid format as a non-existent file so we return an empty position.
|
||||
pos, _ := ParsePos(strings.TrimSpace(string(buf)))
|
||||
return pos, nil
|
||||
}
|
||||
|
||||
// writePositionFile writes pos as the current position.
|
||||
func (db *DB) writePositionFile(pos Pos) error {
|
||||
return internal.WriteFile(db.PositionPath(), []byte(pos.String()+"\n"), db.fileMode, db.uid, db.gid)
|
||||
@@ -1458,10 +1323,10 @@ func (db *DB) writePositionFile(pos Pos) error {
|
||||
func (db *DB) WALSegments(ctx context.Context, generation string) (*FileWALSegmentIterator, error) {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
return db.walSegments(ctx, generation, true)
|
||||
return db.walSegments(ctx, generation)
|
||||
}
|
||||
|
||||
func (db *DB) walSegments(ctx context.Context, generation string, managed bool) (*FileWALSegmentIterator, error) {
|
||||
func (db *DB) walSegments(ctx context.Context, generation string) (*FileWALSegmentIterator, error) {
|
||||
ents, err := os.ReadDir(db.ShadowWALDir(generation))
|
||||
if os.IsNotExist(err) {
|
||||
return NewFileWALSegmentIterator(db.ShadowWALDir(generation), generation, nil), nil
|
||||
@@ -1481,27 +1346,7 @@ func (db *DB) walSegments(ctx context.Context, generation string, managed bool)
|
||||
|
||||
sort.Ints(indexes)
|
||||
|
||||
itr := NewFileWALSegmentIterator(db.ShadowWALDir(generation), generation, indexes)
|
||||
|
||||
// Managed iterators will have new segments pushed to them.
|
||||
if managed {
|
||||
itr.closeFunc = func() error {
|
||||
return db.CloseWALSegmentIterator(itr)
|
||||
}
|
||||
|
||||
db.itrs[itr] = struct{}{}
|
||||
}
|
||||
|
||||
return itr, nil
|
||||
}
|
||||
|
||||
// CloseWALSegmentIterator removes itr from the list of managed iterators.
|
||||
func (db *DB) CloseWALSegmentIterator(itr *FileWALSegmentIterator) error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
delete(db.itrs, itr)
|
||||
return nil
|
||||
return NewFileWALSegmentIterator(db.ShadowWALDir(generation), generation, indexes), nil
|
||||
}
|
||||
|
||||
// SQLite WAL constants
|
||||
@@ -1645,15 +1490,8 @@ func (db *DB) execCheckpoint(mode string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) monitor(ctx context.Context) error {
|
||||
if db.StreamClient != nil {
|
||||
return db.monitorUpstream(ctx)
|
||||
}
|
||||
return db.monitorLocal(ctx)
|
||||
}
|
||||
|
||||
// monitor runs in a separate goroutine and monitors the local database & WAL.
|
||||
func (db *DB) monitorLocal(ctx context.Context) error {
|
||||
func (db *DB) monitor(ctx context.Context) error {
|
||||
var timer *time.Timer
|
||||
if db.MonitorDelayInterval > 0 {
|
||||
timer = time.NewTimer(db.MonitorDelayInterval)
|
||||
@@ -1686,189 +1524,6 @@ func (db *DB) monitorLocal(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// monitorUpstream runs in a separate goroutine and streams data into the local DB.
|
||||
func (db *DB) monitorUpstream(ctx context.Context) error {
|
||||
for {
|
||||
if err := db.stream(ctx); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
db.Logger.Printf("stream error, retrying: %s", err)
|
||||
}
|
||||
|
||||
// Delay before retrying stream.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stream initializes the local database and continuously streams new upstream data.
|
||||
func (db *DB) stream(ctx context.Context) error {
|
||||
pos, err := db.readPositionFile()
|
||||
if err != nil {
|
||||
return fmt.Errorf("read position file: %w", err)
|
||||
}
|
||||
|
||||
// Continuously stream and apply records from client.
|
||||
sr, err := db.StreamClient.Stream(ctx, pos)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream connect: %w", err)
|
||||
}
|
||||
defer sr.Close()
|
||||
|
||||
// Initialize the database and create it if it doesn't exist.
|
||||
if err := db.initReplica(sr.PageSize()); err != nil {
|
||||
return fmt.Errorf("init replica: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
hdr, err := sr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch hdr.Type {
|
||||
case StreamRecordTypeSnapshot:
|
||||
if err := db.streamSnapshot(ctx, hdr, sr); err != nil {
|
||||
return fmt.Errorf("snapshot: %w", err)
|
||||
}
|
||||
case StreamRecordTypeWALSegment:
|
||||
if err := db.streamWALSegment(ctx, hdr, sr); err != nil {
|
||||
return fmt.Errorf("wal segment: %w", err)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("invalid stream record type: 0x%02x", hdr.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// streamSnapshot reads the snapshot into the WAL and applies it to the main database.
|
||||
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
|
||||
// Truncate WAL file.
|
||||
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
||||
return fmt.Errorf("truncate: %w", err)
|
||||
}
|
||||
|
||||
// Determine total page count.
|
||||
pageN := int(hdr.Size / int64(db.pageSize))
|
||||
|
||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
||||
if err := ww.Open(); err != nil {
|
||||
return fmt.Errorf("open wal writer: %w", err)
|
||||
}
|
||||
defer func() { _ = ww.Close() }()
|
||||
|
||||
if err := ww.WriteHeader(); err != nil {
|
||||
return fmt.Errorf("write wal header: %w", err)
|
||||
}
|
||||
|
||||
// Iterate over pages
|
||||
buf := make([]byte, db.pageSize)
|
||||
for pgno := uint32(1); ; pgno++ {
|
||||
// Read snapshot page into a buffer.
|
||||
if _, err := io.ReadFull(r, buf); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("read snapshot page %d: %w", pgno, err)
|
||||
}
|
||||
|
||||
// Issue a commit flag when the last page is reached.
|
||||
var commit uint32
|
||||
if pgno == uint32(pageN) {
|
||||
commit = uint32(pageN)
|
||||
}
|
||||
|
||||
// Write page into WAL frame.
|
||||
if err := ww.WriteFrame(pgno, commit, buf); err != nil {
|
||||
return fmt.Errorf("write wal frame: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Close WAL file writer.
|
||||
if err := ww.Close(); err != nil {
|
||||
return fmt.Errorf("close wal writer: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate WAL index.
|
||||
if err := invalidateSHMFile(db.path); err != nil {
|
||||
return fmt.Errorf("invalidate shm file: %w", err)
|
||||
}
|
||||
|
||||
// Write position to file so other processes can read it.
|
||||
if err := db.writePositionFile(hdr.Pos()); err != nil {
|
||||
return fmt.Errorf("write position file: %w", err)
|
||||
}
|
||||
|
||||
db.Logger.Printf("snapshot applied")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// streamWALSegment rewrites a WAL segment into the local WAL and applies it to the main database.
|
||||
func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
|
||||
// Decompress incoming segment
|
||||
zr := lz4.NewReader(r)
|
||||
|
||||
// Drop WAL header if starting from offset zero.
|
||||
if hdr.Offset == 0 {
|
||||
if _, err := io.CopyN(io.Discard, zr, WALHeaderSize); err != nil {
|
||||
return fmt.Errorf("read wal header: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
||||
if err := ww.Open(); err != nil {
|
||||
return fmt.Errorf("open wal writer: %w", err)
|
||||
}
|
||||
defer func() { _ = ww.Close() }()
|
||||
|
||||
if err := ww.WriteHeader(); err != nil {
|
||||
return fmt.Errorf("write wal header: %w", err)
|
||||
}
|
||||
|
||||
// Iterate over incoming WAL pages.
|
||||
buf := make([]byte, WALFrameHeaderSize+db.pageSize)
|
||||
for i := 0; ; i++ {
|
||||
// Read snapshot page into a buffer.
|
||||
if _, err := io.ReadFull(zr, buf); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("read wal frame %d: %w", i, err)
|
||||
}
|
||||
|
||||
// Read page number & commit field.
|
||||
pgno := binary.BigEndian.Uint32(buf[0:])
|
||||
commit := binary.BigEndian.Uint32(buf[4:])
|
||||
|
||||
// Write page into WAL frame.
|
||||
if err := ww.WriteFrame(pgno, commit, buf[WALFrameHeaderSize:]); err != nil {
|
||||
return fmt.Errorf("write wal frame: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Close WAL file writer.
|
||||
if err := ww.Close(); err != nil {
|
||||
return fmt.Errorf("close wal writer: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate WAL index.
|
||||
if err := invalidateSHMFile(db.path); err != nil {
|
||||
return fmt.Errorf("invalidate shm file: %w", err)
|
||||
}
|
||||
|
||||
// Write position to file so other processes can read it.
|
||||
if err := db.writePositionFile(hdr.Pos()); err != nil {
|
||||
return fmt.Errorf("write position file: %w", err)
|
||||
}
|
||||
|
||||
db.Logger.Printf("wal segment applied: %s", hdr.Pos().String())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ApplyWAL performs a truncating checkpoint on the given database.
|
||||
func ApplyWAL(ctx context.Context, dbPath, walPath string) error {
|
||||
// Copy WAL file from it's staging path to the correct "-wal" location.
|
||||
@@ -2016,51 +1671,6 @@ func logPrefixPath(path string) string {
|
||||
return path
|
||||
}
|
||||
|
||||
// invalidateSHMFile clears the iVersion field of the -shm file in order that
|
||||
// the next transaction will rebuild it.
|
||||
func invalidateSHMFile(dbPath string) error {
|
||||
db, err := sql.Open("sqlite3", dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reopen db: %w", err)
|
||||
}
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
if _, err := db.Exec(`PRAGMA wal_checkpoint(PASSIVE)`); err != nil {
|
||||
return fmt.Errorf("passive checkpoint: %w", err)
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(dbPath+"-shm", os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open shm index: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
buf := make([]byte, WALIndexHeaderSize)
|
||||
if _, err := io.ReadFull(f, buf); err != nil {
|
||||
return fmt.Errorf("read shm index: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate "isInit" fields.
|
||||
buf[12], buf[60] = 0, 0
|
||||
|
||||
// Rewrite header.
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("seek shm index: %w", err)
|
||||
} else if _, err := f.Write(buf); err != nil {
|
||||
return fmt.Errorf("overwrite shm index: %w", err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
return fmt.Errorf("close shm index: %w", err)
|
||||
}
|
||||
|
||||
// Truncate WAL file again.
|
||||
var row [3]int
|
||||
if err := db.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE)`).Scan(&row[0], &row[1], &row[2]); err != nil {
|
||||
return fmt.Errorf("truncate: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// A marker error to indicate that a restart checkpoint could not verify
|
||||
// continuity between WAL indices and a new generation should be started.
|
||||
var errRestartGeneration = errors.New("restart generation")
|
||||
|
||||
Reference in New Issue
Block a user