|
|
|
|
@@ -17,6 +17,7 @@ import (
|
|
|
|
|
"sort"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"syscall"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/benbjohnson/litestream/internal"
|
|
|
|
|
@@ -1747,23 +1748,34 @@ func (db *DB) stream(ctx context.Context) error {
|
|
|
|
|
|
|
|
|
|
// streamSnapshot reads the snapshot into the WAL and applies it to the main database.
|
|
|
|
|
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
|
|
|
|
|
// Truncate WAL file.
|
|
|
|
|
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
|
|
|
|
return fmt.Errorf("truncate: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Determine total page count.
|
|
|
|
|
pageN := int(hdr.Size / int64(db.pageSize))
|
|
|
|
|
|
|
|
|
|
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
|
|
|
|
if err := ww.Open(); err != nil {
|
|
|
|
|
return fmt.Errorf("open wal writer: %w", err)
|
|
|
|
|
// Open database file.
|
|
|
|
|
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("open db file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = ww.Close() }()
|
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
|
|
if err := ww.WriteHeader(); err != nil {
|
|
|
|
|
return fmt.Errorf("write wal header: %w", err)
|
|
|
|
|
// Open shm file for locking.
|
|
|
|
|
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("open shm file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer shmFile.Close()
|
|
|
|
|
|
|
|
|
|
// Obtain WAL checkpoint lock.
|
|
|
|
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
|
|
|
|
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
|
|
|
|
|
|
|
|
|
// Obtain WAL write lock.
|
|
|
|
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
|
|
|
|
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
|
|
|
|
|
|
|
|
|
// Iterate over pages
|
|
|
|
|
buf := make([]byte, db.pageSize)
|
|
|
|
|
@@ -1775,26 +1787,18 @@ func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.
|
|
|
|
|
return fmt.Errorf("read snapshot page %d: %w", pgno, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Issue a commit flag when the last page is reached.
|
|
|
|
|
var commit uint32
|
|
|
|
|
// Copy page to database file.
|
|
|
|
|
offset := int64(pgno-1) * int64(db.pageSize)
|
|
|
|
|
if _, err := f.WriteAt(buf, offset); err != nil {
|
|
|
|
|
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Truncate database to final size.
|
|
|
|
|
if pgno == uint32(pageN) {
|
|
|
|
|
commit = uint32(pageN)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write page into WAL frame.
|
|
|
|
|
if err := ww.WriteFrame(pgno, commit, buf); err != nil {
|
|
|
|
|
return fmt.Errorf("write wal frame: %w", err)
|
|
|
|
|
if err := f.Truncate(int64(pageN) * int64(db.pageSize)); err != nil {
|
|
|
|
|
return fmt.Errorf("truncate db: commit=%d err=%w", pageN, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close WAL file writer.
|
|
|
|
|
if err := ww.Close(); err != nil {
|
|
|
|
|
return fmt.Errorf("close wal writer: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Invalidate WAL index.
|
|
|
|
|
if err := invalidateSHMFile(db.path); err != nil {
|
|
|
|
|
return fmt.Errorf("invalidate shm file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write position to file so other processes can read it.
|
|
|
|
|
@@ -1819,44 +1823,63 @@ func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r i
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
|
|
|
|
if err := ww.Open(); err != nil {
|
|
|
|
|
return fmt.Errorf("open wal writer: %w", err)
|
|
|
|
|
// Open database file.
|
|
|
|
|
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("open db file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = ww.Close() }()
|
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
|
|
if err := ww.WriteHeader(); err != nil {
|
|
|
|
|
return fmt.Errorf("write wal header: %w", err)
|
|
|
|
|
// Open shm file for locking.
|
|
|
|
|
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("open shm file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer shmFile.Close()
|
|
|
|
|
|
|
|
|
|
// Obtain WAL checkpoint lock.
|
|
|
|
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
|
|
|
|
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
|
|
|
|
|
|
|
|
|
// Obtain WAL write lock.
|
|
|
|
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
|
|
|
|
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
|
|
|
|
|
|
|
|
|
// Iterate over incoming WAL pages.
|
|
|
|
|
buf := make([]byte, WALFrameHeaderSize+db.pageSize)
|
|
|
|
|
for i := 0; ; i++ {
|
|
|
|
|
// Read snapshot page into a buffer.
|
|
|
|
|
if _, err := io.ReadFull(zr, buf); err == io.EOF {
|
|
|
|
|
if n, err := io.ReadFull(zr, buf); err == io.EOF {
|
|
|
|
|
break
|
|
|
|
|
} else if err != nil {
|
|
|
|
|
return fmt.Errorf("read wal frame %d: %w", i, err)
|
|
|
|
|
return fmt.Errorf("read wal frame: i=%d n=%d err=%w", i, n, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read page number & commit field.
|
|
|
|
|
pgno := binary.BigEndian.Uint32(buf[0:])
|
|
|
|
|
commit := binary.BigEndian.Uint32(buf[4:])
|
|
|
|
|
|
|
|
|
|
// Write page into WAL frame.
|
|
|
|
|
if err := ww.WriteFrame(pgno, commit, buf[WALFrameHeaderSize:]); err != nil {
|
|
|
|
|
return fmt.Errorf("write wal frame: %w", err)
|
|
|
|
|
// Copy page to database file.
|
|
|
|
|
offset := int64(pgno-1) * int64(db.pageSize)
|
|
|
|
|
if _, err := f.WriteAt(buf[WALFrameHeaderSize:], offset); err != nil {
|
|
|
|
|
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Truncate database, if commit specified.
|
|
|
|
|
if commit != 0 {
|
|
|
|
|
if err := f.Truncate(int64(commit) * int64(db.pageSize)); err != nil {
|
|
|
|
|
return fmt.Errorf("truncate db: commit=%d err=%w", commit, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close WAL file writer.
|
|
|
|
|
if err := ww.Close(); err != nil {
|
|
|
|
|
return fmt.Errorf("close wal writer: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Invalidate WAL index.
|
|
|
|
|
if err := invalidateSHMFile(db.path); err != nil {
|
|
|
|
|
return fmt.Errorf("invalidate shm file: %w", err)
|
|
|
|
|
// Close database file writer.
|
|
|
|
|
if err := f.Close(); err != nil {
|
|
|
|
|
return fmt.Errorf("close db writer: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write position to file so other processes can read it.
|
|
|
|
|
@@ -2016,51 +2039,21 @@ func logPrefixPath(path string) string {
|
|
|
|
|
return path
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// invalidateSHMFile clears the iVersion field of the -shm file in order that
|
|
|
|
|
// the next transaction will rebuild it.
|
|
|
|
|
func invalidateSHMFile(dbPath string) error {
|
|
|
|
|
db, err := sql.Open("sqlite3", dbPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("reopen db: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = db.Close() }()
|
|
|
|
|
|
|
|
|
|
if _, err := db.Exec(`PRAGMA wal_checkpoint(PASSIVE)`); err != nil {
|
|
|
|
|
return fmt.Errorf("passive checkpoint: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
f, err := os.OpenFile(dbPath+"-shm", os.O_RDWR, 0666)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("open shm index: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
|
|
buf := make([]byte, WALIndexHeaderSize)
|
|
|
|
|
if _, err := io.ReadFull(f, buf); err != nil {
|
|
|
|
|
return fmt.Errorf("read shm index: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Invalidate "isInit" fields.
|
|
|
|
|
buf[12], buf[60] = 0, 0
|
|
|
|
|
|
|
|
|
|
// Rewrite header.
|
|
|
|
|
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
|
|
|
|
return fmt.Errorf("seek shm index: %w", err)
|
|
|
|
|
} else if _, err := f.Write(buf); err != nil {
|
|
|
|
|
return fmt.Errorf("overwrite shm index: %w", err)
|
|
|
|
|
} else if err := f.Close(); err != nil {
|
|
|
|
|
return fmt.Errorf("close shm index: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Truncate WAL file again.
|
|
|
|
|
var row [3]int
|
|
|
|
|
if err := db.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE)`).Scan(&row[0], &row[1], &row[2]); err != nil {
|
|
|
|
|
return fmt.Errorf("truncate: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// A marker error to indicate that a restart checkpoint could not verify
|
|
|
|
|
// continuity between WAL indices and a new generation should be started.
|
|
|
|
|
var errRestartGeneration = errors.New("restart generation")
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
WAL_WRITE_LOCK_OFFSET = 120
|
|
|
|
|
WAL_CKPT_LOCK_OFFSET = 121
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// setLkw is a helper function for calling fcntl for file locking.
|
|
|
|
|
func setLkw(f *os.File, typ int16, start, len int64) error {
|
|
|
|
|
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &syscall.Flock_t{
|
|
|
|
|
Start: start,
|
|
|
|
|
Len: len,
|
|
|
|
|
Type: typ,
|
|
|
|
|
Whence: io.SeekStart,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|