diff --git a/db.go b/db.go index 46379b3..538a292 100644 --- a/db.go +++ b/db.go @@ -1,6 +1,7 @@ package litestream import ( + "bytes" "context" "fmt" "io" @@ -34,8 +35,9 @@ type DB struct { // Tracks WAL state. walHeader *sqlite.WALHeader - processedWALByteN int64 // bytes copied to shadow WAL - pendingWALByteN int64 // bytes pending copy to shadow WAL + processedWALByteN int64 // bytes copied to shadow WAL + pendingWALByteN int64 // bytes pending copy to shadow WAL + checksum uint64 // running checksum on real WAL ctx context.Context cancel func() @@ -84,6 +86,48 @@ func (db *DB) ActiveShadowWALPath() string { return filepath.Join(db.ShadowWALDir(), ActiveWALName) } +// LastShadowWALPath returns the active shadow WAL or the last snapshotted shadow WAL path. +// Returns an empty string if no shadow WAL files exist. +func (db *DB) LastShadowWALPath() (string, error) { + // Return active shadow WAL if it exists. + if _, err := os.Stat(db.ActiveShadowWALPath()); err == nil { + return db.ActiveShadowWALPath(), nil + } else if !os.IsNotExist(err) { + return "", nil + } + + // Otherwise search for the largest shadow WAL file. + f, err := os.Open(db.ShadowWALDir()) + if os.IsNotExist(err) { + return "", nil + } else if err != nil { + return "", err + } + defer f.Close() + + var filename string + for { + fis, err := f.Readdir(512) + if err != nil { + return "", err + } + + for _, fi := range fis { + if !strings.HasSuffix(fi.Name(), WALExt) { + continue + } else if filename == "" || fi.Name() > filename { + filename = fi.Name() + } + } + } + + // Return an error if there is no shadow WAL files. + if filename == "" { + return "", os.ErrNotExist + } + return filepath.Join(db.ShadowWALDir(), filename), nil +} + // LogPath returns the path to the internal log directory. func (db *DB) LogPath() string { return filepath.Join(db.MetaPath(), LogFilename) @@ -163,49 +207,7 @@ func (db *DB) recover() error { // recoverRealWALOnly copies the real WAL to the active shadow WAL. func (db *DB) recoverRealWALOnly() error { db.logger.Printf("recovering: real WAL only") - - // Open real WAL to read from. - r, err := os.Open(db.WALPath()) - if err != nil { - return fmt.Errorf("cannot open wal: %w", err) - } - defer r.Close() - - // Read header from real WAL. - var hdr sqlite.WALHeader - if _, err := hdr.ReadFrom(r); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - - // Create a new shadow WAL file. - w, err := db.createActiveShadowWAL(hdr) - if err != nil { - return fmt.Errorf("cannot create active shadow wal: %w", err) - } - defer w.Close() - - // Read from real WAL and copy to shadow WAL. - buf := make([]byte, hdr.PageSize) - for { - // Read frame header & data from real WAL. - var fhdr sqlite.WALFrameHeader - if _, err := fhdr.ReadFrom(r); err != nil { - return fmt.Errorf("cannot read frame header: %w", err) - } else if _, err := io.ReadFull(r, buf); err != nil { - return fmt.Errorf("cannot read frame: %w", err) - } - - // Copy to the shadow WAL. - if _, err := fhdr.WriteTo(w); err != nil { - return fmt.Errorf("cannot write frame to shadow: %w", err) - } else if _, err := w.Write(buf); err != nil { - return fmt.Errorf("cannot write frame to shadow: %w", err) - } - } - - return nil + return db.sync() } // walExists returns true if the real WAL exists. @@ -238,32 +240,6 @@ func (db *DB) shadowWALExists() (bool, error) { } } -// createActiveShadowWAL creates a new shadow WAL file with the given header. -func (db *DB) createActiveShadowWAL(hdr sqlite.WALHeader) (f *os.File, err error) { - if f, err = os.OpenFile(db.ActiveShadowWALPath(), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil { - return nil, err - } - - // Attempt to clean up if shadow WAL creation fails. - defer func() { - if err != nil { - f.Close() - os.Remove(f.Name()) - } - }() - - // Clear some fields from header that we won't use for the shadow WAL. - hdr = ClearShadowWALHeader(hdr) - - // Write header & save it to the DB to ensure new WAL header writes match. - if _, err := hdr.WriteTo(f); err != nil { - return nil, err - } - db.walHeader = &hdr - - return f, nil -} - // recoverShadowWALOnly verifies the last page in the shadow WAL matches the // contents of the database page. func (db *DB) recoverShadowWALOnly() error { @@ -274,7 +250,131 @@ func (db *DB) recoverShadowWALOnly() error { // recoverRealAndShadowWALs verifies the last page of the real & shadow WALs match. func (db *DB) recoverRealAndShadowWALs() error { db.logger.Printf("recovering: real & shadow WAL") - panic("TODO") + + // Read WAL header from shadow WAL. + lastShadowWALPath, err := db.LastShadowWALPath() + if err != nil { + return fmt.Errorf("cannot find last shadow wal path: %w", err) + } + hdr, err := readFileWALHeader(lastShadowWALPath) + if err != nil { + return fmt.Errorf("cannot read last shadow wal header: %w", err) + } + db.walHeader = &hdr + + // Read last pages from shadow WAL & real WAL and ensure they match. + if fhdr0, data0, err := readLastWALPage(lastShadowWALPath); err != nil { + return fmt.Errorf("cannot read last shadow wal page: %w", err) + } else if fhdr1, data1, err := readLastWALPage(db.WALPath()); err != nil { + return fmt.Errorf("cannot read last shadow wal page: %w", err) + } else if fhdr0 != fhdr1 { + return fmt.Errorf("last frame header mismatch: %#v != %#v", fhdr0, fhdr1) + } else if !bytes.Equal(data0, data1) { + return fmt.Errorf("last frame data mismatch") + } else { + db.checksum = fhdr1.Checksum + } + + // Update position within real WAL. + fi, err := os.Stat(db.WALPath()) + if err != nil { + return fmt.Errorf("cannot stat wal: %w", err) + } + db.processedWALByteN = fi.Size() + db.pendingWALByteN = 0 + + return nil +} + +// Sync synchronizes the real WAL to the shadow WAL. +func (db *DB) Sync() error { + db.mu.Lock() + defer db.mu.Unlock() + return db.sync() +} + +func (db *DB) sync() (err error) { + db.logger.Printf("sync: begin") + defer func() { + db.logger.Printf("sync: end: err=%v", err) + }() + + // Open real WAL to read from. + r, err := os.Open(db.WALPath()) + if err != nil { + return fmt.Errorf("cannot open wal: %w", err) + } else if _, err := r.Seek(db.processedWALByteN, io.SeekStart); err != nil { + return fmt.Errorf("cannot seek wal: %w", err) + } + defer r.Close() + + // TODO: Verify all frames are valid & committed before copy. + + // Read header if we are at the beginning of the WAL. + if db.processedWALByteN == 0 { + var hdr sqlite.WALHeader + if _, err := hdr.ReadFrom(r); err != nil { + return fmt.Errorf("cannot read wal header: %w", err) + } + + // Save checksum to verify later pages in WAL. + db.checksum = hdr.Checksum + + // Clear out salt & checksum from header for shadow WAL. + hdr = ClearShadowWALHeader(hdr) + db.walHeader = &hdr + } + + // Open shadow WAL to copy to. + w, err := os.OpenFile(db.ActiveShadowWALPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) + if err != nil { + return fmt.Errorf("cannot open active shadow wal: %w", err) + } + defer w.Close() + + // If we are at the start of a new shadow WAL, write a header. + if n, err := w.Seek(0, io.SeekCurrent); err != nil { + return fmt.Errorf("cannot seek shadow wal: %w", err) + } else if n == 0 { + db.logger.Printf("sync: new shadow wal, writing header: magic=%x page-size=%d", db.walHeader.Magic, db.walHeader.PageSize) + if _, err := db.walHeader.WriteTo(w); err != nil { + return fmt.Errorf("cannot write shadow wal header: %w", err) + } + } + + // Read from real WAL and copy to shadow WAL. + buf := make([]byte, db.walHeader.PageSize) + for db.pendingWALByteN != 0 { + // Read frame header & data from real WAL. + var fhdr sqlite.WALFrameHeader + if _, err := fhdr.ReadFrom(r); err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("cannot read frame header: %w", err) + } else if _, err := io.ReadFull(r, buf); err != nil { + return fmt.Errorf("cannot read frame: %w", err) + } + + db.logger.Printf("sync: copy frame: pgno=%d pageN=%d salt=%x checksum=%x", + fhdr.Pgno, + fhdr.PageN, + fhdr.Salt, + fhdr.Checksum, + ) + + // Copy to the shadow WAL. + if _, err := fhdr.WriteTo(w); err != nil { + return fmt.Errorf("cannot write frame to shadow: %w", err) + } else if _, err := w.Write(buf); err != nil { + return fmt.Errorf("cannot write frame to shadow: %w", err) + } + + byteN := int64(sqlite.WALFrameHeaderSize + len(buf)) + db.processedWALByteN += byteN + db.pendingWALByteN -= byteN + } + + return nil } // Close stops management of the database. @@ -340,6 +440,7 @@ func (db *DB) setHeader(page []byte) { func (db *DB) AddPendingWALByteN(n int64) { db.mu.Lock() defer db.mu.Unlock() + db.logger.Printf("write: n=%d pending=%d", n, db.pendingWALByteN) db.pendingWALByteN += n } @@ -366,3 +467,52 @@ func ClearShadowWALHeader(hdr sqlite.WALHeader) sqlite.WALHeader { hdr.Checksum = 0 return hdr } + +// readLastWALPage reads the last frame header & data from a WAL file. +func readLastWALPage(path string) (fhdr sqlite.WALFrameHeader, data []byte, err error) { + f, err := os.Open(path) + if err != nil { + return fhdr, data, err + } + defer f.Close() + + // Determine WAL file size. + fi, err := f.Stat() + if err != nil { + return fhdr, data, err + } + + // Read WAL header to determine page size. + var hdr sqlite.WALHeader + if _, err := hdr.ReadFrom(f); err != nil { + return fhdr, data, fmt.Errorf("cannot read wal header: %w", err) + } + + // WAL file size must be divisible by frame size (minus the header). + if (fi.Size()-sqlite.WALHeaderSize)%(sqlite.WALFrameHeaderSize+int64(hdr.PageSize)) != 0 { + return fhdr, data, fmt.Errorf("partial wal record: path=%s sz=%d", path, fi.Size()) + } + + // Seek to last frame and read header & data. + data = make([]byte, hdr.PageSize) + if _, err := f.Seek(sqlite.WALFrameHeaderSize+int64(hdr.PageSize), io.SeekStart); err != nil { + return fhdr, data, fmt.Errorf("cannot seek: %w", err) + } else if _, err := fhdr.ReadFrom(f); err != nil { + return fhdr, data, fmt.Errorf("cannot read frame header: %w", err) + } else if _, err := io.ReadFull(f, data); err != nil { + return fhdr, data, fmt.Errorf("cannot read frame data: %w", err) + } + return fhdr, data, nil +} + +// readFileWALHeader reads the WAL header from file. +func readFileWALHeader(path string) (hdr sqlite.WALHeader, err error) { + f, err := os.Open(path) + if err != nil { + return hdr, err + } + defer f.Close() + + _, err = hdr.ReadFrom(f) + return hdr, err +} diff --git a/handle.go b/handle.go index eaf1531..a0009a5 100644 --- a/handle.go +++ b/handle.go @@ -37,6 +37,9 @@ func NewHandle(n *Node, f *os.File) *Handle { // Release closes the underlying file descriptor. func (h *Handle) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error) { + if err := h.node.Sync(); err != nil { + return err + } return h.f.Close() } @@ -90,6 +93,9 @@ func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.W // Flush is called when a file handle is synced to disk. Implements fs.HandleFlusher. func (h *Handle) Flush(ctx context.Context, req *fuse.FlushRequest) (err error) { + if err := h.node.Sync(); err != nil { + return err + } return h.f.Sync() } diff --git a/node.go b/node.go index 5abb289..4672bfe 100644 --- a/node.go +++ b/node.go @@ -56,6 +56,11 @@ func (n *Node) srcpath() string { return filepath.Join(n.fs.TargetPath, n.path) } +// IsWAL returns true if node path has a "-wal" suffix. +func (n *Node) IsWAL() bool { + return strings.HasSuffix(n.path, sqlite.WALSuffix) +} + // DB returns the DB object associated with the node, if any. // If node points to a "-wal" file then the associated DB is returned. func (n *Node) DB() *DB { @@ -65,6 +70,25 @@ func (n *Node) DB() *DB { return n.fs.DB(n.path) } +// Sync synchronizes the data to the shadow WAL if this node is the WAL. +func (n *Node) Sync() (err error) { + println("dbg/node.sync") + // Ignore if this is not the WAL. + if !n.IsWAL() { + println("dbg/node.sync.notwal", n.path) + return nil + } + + // Ignore if the node is not a managed db. + db := n.DB() + if db == nil { + println("dbg/node.sync.notmanaged", n.path) + return nil + } + + return db.Sync() +} + func (n *Node) Attr(ctx context.Context, a *fuse.Attr) (err error) { fi, err := os.Stat(n.srcpath()) if err != nil { @@ -312,6 +336,11 @@ func (n *Node) Mknod(ctx context.Context, req *fuse.MknodRequest) (_ fs.Node, er } func (n *Node) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) { + // Synchronize to shadow WAL. + if err := n.Sync(); err != nil { + return err + } + f, err := os.Open(n.srcpath()) if err != nil { return err