Add real+shadow recovery
This commit is contained in:
290
db.go
290
db.go
@@ -1,6 +1,7 @@
|
||||
package litestream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -36,6 +37,7 @@ type DB struct {
|
||||
walHeader *sqlite.WALHeader
|
||||
processedWALByteN int64 // bytes copied to shadow WAL
|
||||
pendingWALByteN int64 // bytes pending copy to shadow WAL
|
||||
checksum uint64 // running checksum on real WAL
|
||||
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
@@ -84,6 +86,48 @@ func (db *DB) ActiveShadowWALPath() string {
|
||||
return filepath.Join(db.ShadowWALDir(), ActiveWALName)
|
||||
}
|
||||
|
||||
// LastShadowWALPath returns the active shadow WAL or the last snapshotted shadow WAL path.
|
||||
// Returns an empty string if no shadow WAL files exist.
|
||||
func (db *DB) LastShadowWALPath() (string, error) {
|
||||
// Return active shadow WAL if it exists.
|
||||
if _, err := os.Stat(db.ActiveShadowWALPath()); err == nil {
|
||||
return db.ActiveShadowWALPath(), nil
|
||||
} else if !os.IsNotExist(err) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Otherwise search for the largest shadow WAL file.
|
||||
f, err := os.Open(db.ShadowWALDir())
|
||||
if os.IsNotExist(err) {
|
||||
return "", nil
|
||||
} else if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var filename string
|
||||
for {
|
||||
fis, err := f.Readdir(512)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
if !strings.HasSuffix(fi.Name(), WALExt) {
|
||||
continue
|
||||
} else if filename == "" || fi.Name() > filename {
|
||||
filename = fi.Name()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return an error if there is no shadow WAL files.
|
||||
if filename == "" {
|
||||
return "", os.ErrNotExist
|
||||
}
|
||||
return filepath.Join(db.ShadowWALDir(), filename), nil
|
||||
}
|
||||
|
||||
// LogPath returns the path to the internal log directory.
|
||||
func (db *DB) LogPath() string {
|
||||
return filepath.Join(db.MetaPath(), LogFilename)
|
||||
@@ -163,49 +207,7 @@ func (db *DB) recover() error {
|
||||
// recoverRealWALOnly copies the real WAL to the active shadow WAL.
|
||||
func (db *DB) recoverRealWALOnly() error {
|
||||
db.logger.Printf("recovering: real WAL only")
|
||||
|
||||
// Open real WAL to read from.
|
||||
r, err := os.Open(db.WALPath())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open wal: %w", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
// Read header from real WAL.
|
||||
var hdr sqlite.WALHeader
|
||||
if _, err := hdr.ReadFrom(r); os.IsNotExist(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a new shadow WAL file.
|
||||
w, err := db.createActiveShadowWAL(hdr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create active shadow wal: %w", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// Read from real WAL and copy to shadow WAL.
|
||||
buf := make([]byte, hdr.PageSize)
|
||||
for {
|
||||
// Read frame header & data from real WAL.
|
||||
var fhdr sqlite.WALFrameHeader
|
||||
if _, err := fhdr.ReadFrom(r); err != nil {
|
||||
return fmt.Errorf("cannot read frame header: %w", err)
|
||||
} else if _, err := io.ReadFull(r, buf); err != nil {
|
||||
return fmt.Errorf("cannot read frame: %w", err)
|
||||
}
|
||||
|
||||
// Copy to the shadow WAL.
|
||||
if _, err := fhdr.WriteTo(w); err != nil {
|
||||
return fmt.Errorf("cannot write frame to shadow: %w", err)
|
||||
} else if _, err := w.Write(buf); err != nil {
|
||||
return fmt.Errorf("cannot write frame to shadow: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return db.sync()
|
||||
}
|
||||
|
||||
// walExists returns true if the real WAL exists.
|
||||
@@ -238,32 +240,6 @@ func (db *DB) shadowWALExists() (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// createActiveShadowWAL creates a new shadow WAL file with the given header.
|
||||
func (db *DB) createActiveShadowWAL(hdr sqlite.WALHeader) (f *os.File, err error) {
|
||||
if f, err = os.OpenFile(db.ActiveShadowWALPath(), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attempt to clean up if shadow WAL creation fails.
|
||||
defer func() {
|
||||
if err != nil {
|
||||
f.Close()
|
||||
os.Remove(f.Name())
|
||||
}
|
||||
}()
|
||||
|
||||
// Clear some fields from header that we won't use for the shadow WAL.
|
||||
hdr = ClearShadowWALHeader(hdr)
|
||||
|
||||
// Write header & save it to the DB to ensure new WAL header writes match.
|
||||
if _, err := hdr.WriteTo(f); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db.walHeader = &hdr
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// recoverShadowWALOnly verifies the last page in the shadow WAL matches the
|
||||
// contents of the database page.
|
||||
func (db *DB) recoverShadowWALOnly() error {
|
||||
@@ -274,7 +250,131 @@ func (db *DB) recoverShadowWALOnly() error {
|
||||
// recoverRealAndShadowWALs verifies the last page of the real & shadow WALs match.
|
||||
func (db *DB) recoverRealAndShadowWALs() error {
|
||||
db.logger.Printf("recovering: real & shadow WAL")
|
||||
panic("TODO")
|
||||
|
||||
// Read WAL header from shadow WAL.
|
||||
lastShadowWALPath, err := db.LastShadowWALPath()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot find last shadow wal path: %w", err)
|
||||
}
|
||||
hdr, err := readFileWALHeader(lastShadowWALPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read last shadow wal header: %w", err)
|
||||
}
|
||||
db.walHeader = &hdr
|
||||
|
||||
// Read last pages from shadow WAL & real WAL and ensure they match.
|
||||
if fhdr0, data0, err := readLastWALPage(lastShadowWALPath); err != nil {
|
||||
return fmt.Errorf("cannot read last shadow wal page: %w", err)
|
||||
} else if fhdr1, data1, err := readLastWALPage(db.WALPath()); err != nil {
|
||||
return fmt.Errorf("cannot read last shadow wal page: %w", err)
|
||||
} else if fhdr0 != fhdr1 {
|
||||
return fmt.Errorf("last frame header mismatch: %#v != %#v", fhdr0, fhdr1)
|
||||
} else if !bytes.Equal(data0, data1) {
|
||||
return fmt.Errorf("last frame data mismatch")
|
||||
} else {
|
||||
db.checksum = fhdr1.Checksum
|
||||
}
|
||||
|
||||
// Update position within real WAL.
|
||||
fi, err := os.Stat(db.WALPath())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot stat wal: %w", err)
|
||||
}
|
||||
db.processedWALByteN = fi.Size()
|
||||
db.pendingWALByteN = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync synchronizes the real WAL to the shadow WAL.
|
||||
func (db *DB) Sync() error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
return db.sync()
|
||||
}
|
||||
|
||||
func (db *DB) sync() (err error) {
|
||||
db.logger.Printf("sync: begin")
|
||||
defer func() {
|
||||
db.logger.Printf("sync: end: err=%v", err)
|
||||
}()
|
||||
|
||||
// Open real WAL to read from.
|
||||
r, err := os.Open(db.WALPath())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open wal: %w", err)
|
||||
} else if _, err := r.Seek(db.processedWALByteN, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("cannot seek wal: %w", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
// TODO: Verify all frames are valid & committed before copy.
|
||||
|
||||
// Read header if we are at the beginning of the WAL.
|
||||
if db.processedWALByteN == 0 {
|
||||
var hdr sqlite.WALHeader
|
||||
if _, err := hdr.ReadFrom(r); err != nil {
|
||||
return fmt.Errorf("cannot read wal header: %w", err)
|
||||
}
|
||||
|
||||
// Save checksum to verify later pages in WAL.
|
||||
db.checksum = hdr.Checksum
|
||||
|
||||
// Clear out salt & checksum from header for shadow WAL.
|
||||
hdr = ClearShadowWALHeader(hdr)
|
||||
db.walHeader = &hdr
|
||||
}
|
||||
|
||||
// Open shadow WAL to copy to.
|
||||
w, err := os.OpenFile(db.ActiveShadowWALPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open active shadow wal: %w", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// If we are at the start of a new shadow WAL, write a header.
|
||||
if n, err := w.Seek(0, io.SeekCurrent); err != nil {
|
||||
return fmt.Errorf("cannot seek shadow wal: %w", err)
|
||||
} else if n == 0 {
|
||||
db.logger.Printf("sync: new shadow wal, writing header: magic=%x page-size=%d", db.walHeader.Magic, db.walHeader.PageSize)
|
||||
if _, err := db.walHeader.WriteTo(w); err != nil {
|
||||
return fmt.Errorf("cannot write shadow wal header: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Read from real WAL and copy to shadow WAL.
|
||||
buf := make([]byte, db.walHeader.PageSize)
|
||||
for db.pendingWALByteN != 0 {
|
||||
// Read frame header & data from real WAL.
|
||||
var fhdr sqlite.WALFrameHeader
|
||||
if _, err := fhdr.ReadFrom(r); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("cannot read frame header: %w", err)
|
||||
} else if _, err := io.ReadFull(r, buf); err != nil {
|
||||
return fmt.Errorf("cannot read frame: %w", err)
|
||||
}
|
||||
|
||||
db.logger.Printf("sync: copy frame: pgno=%d pageN=%d salt=%x checksum=%x",
|
||||
fhdr.Pgno,
|
||||
fhdr.PageN,
|
||||
fhdr.Salt,
|
||||
fhdr.Checksum,
|
||||
)
|
||||
|
||||
// Copy to the shadow WAL.
|
||||
if _, err := fhdr.WriteTo(w); err != nil {
|
||||
return fmt.Errorf("cannot write frame to shadow: %w", err)
|
||||
} else if _, err := w.Write(buf); err != nil {
|
||||
return fmt.Errorf("cannot write frame to shadow: %w", err)
|
||||
}
|
||||
|
||||
byteN := int64(sqlite.WALFrameHeaderSize + len(buf))
|
||||
db.processedWALByteN += byteN
|
||||
db.pendingWALByteN -= byteN
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close stops management of the database.
|
||||
@@ -340,6 +440,7 @@ func (db *DB) setHeader(page []byte) {
|
||||
func (db *DB) AddPendingWALByteN(n int64) {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
db.logger.Printf("write: n=%d pending=%d", n, db.pendingWALByteN)
|
||||
db.pendingWALByteN += n
|
||||
}
|
||||
|
||||
@@ -366,3 +467,52 @@ func ClearShadowWALHeader(hdr sqlite.WALHeader) sqlite.WALHeader {
|
||||
hdr.Checksum = 0
|
||||
return hdr
|
||||
}
|
||||
|
||||
// readLastWALPage reads the last frame header & data from a WAL file.
|
||||
func readLastWALPage(path string) (fhdr sqlite.WALFrameHeader, data []byte, err error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return fhdr, data, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Determine WAL file size.
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return fhdr, data, err
|
||||
}
|
||||
|
||||
// Read WAL header to determine page size.
|
||||
var hdr sqlite.WALHeader
|
||||
if _, err := hdr.ReadFrom(f); err != nil {
|
||||
return fhdr, data, fmt.Errorf("cannot read wal header: %w", err)
|
||||
}
|
||||
|
||||
// WAL file size must be divisible by frame size (minus the header).
|
||||
if (fi.Size()-sqlite.WALHeaderSize)%(sqlite.WALFrameHeaderSize+int64(hdr.PageSize)) != 0 {
|
||||
return fhdr, data, fmt.Errorf("partial wal record: path=%s sz=%d", path, fi.Size())
|
||||
}
|
||||
|
||||
// Seek to last frame and read header & data.
|
||||
data = make([]byte, hdr.PageSize)
|
||||
if _, err := f.Seek(sqlite.WALFrameHeaderSize+int64(hdr.PageSize), io.SeekStart); err != nil {
|
||||
return fhdr, data, fmt.Errorf("cannot seek: %w", err)
|
||||
} else if _, err := fhdr.ReadFrom(f); err != nil {
|
||||
return fhdr, data, fmt.Errorf("cannot read frame header: %w", err)
|
||||
} else if _, err := io.ReadFull(f, data); err != nil {
|
||||
return fhdr, data, fmt.Errorf("cannot read frame data: %w", err)
|
||||
}
|
||||
return fhdr, data, nil
|
||||
}
|
||||
|
||||
// readFileWALHeader reads the WAL header from file.
|
||||
func readFileWALHeader(path string) (hdr sqlite.WALHeader, err error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return hdr, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
_, err = hdr.ReadFrom(f)
|
||||
return hdr, err
|
||||
}
|
||||
|
||||
@@ -37,6 +37,9 @@ func NewHandle(n *Node, f *os.File) *Handle {
|
||||
|
||||
// Release closes the underlying file descriptor.
|
||||
func (h *Handle) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error) {
|
||||
if err := h.node.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
return h.f.Close()
|
||||
}
|
||||
|
||||
@@ -90,6 +93,9 @@ func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.W
|
||||
|
||||
// Flush is called when a file handle is synced to disk. Implements fs.HandleFlusher.
|
||||
func (h *Handle) Flush(ctx context.Context, req *fuse.FlushRequest) (err error) {
|
||||
if err := h.node.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
return h.f.Sync()
|
||||
}
|
||||
|
||||
|
||||
29
node.go
29
node.go
@@ -56,6 +56,11 @@ func (n *Node) srcpath() string {
|
||||
return filepath.Join(n.fs.TargetPath, n.path)
|
||||
}
|
||||
|
||||
// IsWAL returns true if node path has a "-wal" suffix.
|
||||
func (n *Node) IsWAL() bool {
|
||||
return strings.HasSuffix(n.path, sqlite.WALSuffix)
|
||||
}
|
||||
|
||||
// DB returns the DB object associated with the node, if any.
|
||||
// If node points to a "-wal" file then the associated DB is returned.
|
||||
func (n *Node) DB() *DB {
|
||||
@@ -65,6 +70,25 @@ func (n *Node) DB() *DB {
|
||||
return n.fs.DB(n.path)
|
||||
}
|
||||
|
||||
// Sync synchronizes the data to the shadow WAL if this node is the WAL.
|
||||
func (n *Node) Sync() (err error) {
|
||||
println("dbg/node.sync")
|
||||
// Ignore if this is not the WAL.
|
||||
if !n.IsWAL() {
|
||||
println("dbg/node.sync.notwal", n.path)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ignore if the node is not a managed db.
|
||||
db := n.DB()
|
||||
if db == nil {
|
||||
println("dbg/node.sync.notmanaged", n.path)
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.Sync()
|
||||
}
|
||||
|
||||
func (n *Node) Attr(ctx context.Context, a *fuse.Attr) (err error) {
|
||||
fi, err := os.Stat(n.srcpath())
|
||||
if err != nil {
|
||||
@@ -312,6 +336,11 @@ func (n *Node) Mknod(ctx context.Context, req *fuse.MknodRequest) (_ fs.Node, er
|
||||
}
|
||||
|
||||
func (n *Node) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) {
|
||||
// Synchronize to shadow WAL.
|
||||
if err := n.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.Open(n.srcpath())
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user