From 2941a2433fb4bb209165206fed62c12add1a5844 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 6 Nov 2020 16:30:37 -0700 Subject: [PATCH] Add recovery for 'real WAL only'. --- db.go | 220 +++++++++++++++++++++++++++++++++++++++++++---- doc/NOTES.md | 32 +++++++ file_system.go | 8 +- handle.go | 11 ++- litestream.go | 6 ++ node.go | 39 +++++++-- sqlite/sqlite.go | 40 +++++++++ 7 files changed, 327 insertions(+), 29 deletions(-) create mode 100644 doc/NOTES.md diff --git a/db.go b/db.go index a598353..dc8f926 100644 --- a/db.go +++ b/db.go @@ -2,6 +2,7 @@ package litestream import ( "context" + "fmt" "io" "log" "os" @@ -16,19 +17,23 @@ const ( MetaDirSuffix = "-litestream" ConfigSuffix = ".litestream" - WALDirName = "wal" - LogFilename = "log" + WALDirName = "wal" + WALExt = ".wal" + ActiveWALName = "active.wal" + LogFilename = "log" ) // DB represents an instance of a managed SQLite database in the file system. type DB struct { mu sync.Mutex + fs *FileSystem path string isHeaderValid bool // true if meta page contains SQLITE3 header isWALEnabled bool // true if file format version specifies WAL - // Tracks offset of WAL data. + // Tracks WAL state. + walHeader *sqlite.WALHeader processedWALByteN int64 // bytes copied to shadow WAL pendingWALByteN int64 // bytes pending copy to shadow WAL @@ -42,8 +47,8 @@ type DB struct { } // NewDB returns a new instance of DB for a given path. -func NewDB(path string) *DB { - db := &DB{path: path} +func NewDB(fs *FileSystem, path string) *DB { + db := &DB{fs: fs, path: path} db.ctx, db.cancel = context.WithCancel(context.Background()) return db } @@ -53,19 +58,29 @@ func (db *DB) Path() string { return db.path } -// InternalMetaPath returns the path to the database metadata. -func (db *DB) InternalMetaPath() string { - dir, file := filepath.Split(db.path) - return filepath.Join(db.node.fs.TargetPath, dir, "."+file+MetaDirSuffix) +// WALPath returns the full path to the real WAL. +func (db *DB) WALPath() string { + return filepath.Join(db.fs.TargetPath, db.path+"-wal") } -// InternalWALPath returns the path to the internal WAL directory. -func (db *DB) InternalWALPath() string { +// MetaPath returns the path to the database metadata. +func (db *DB) MetaPath() string { + dir, file := filepath.Split(db.path) + return filepath.Join(db.fs.TargetPath, dir, "."+file+MetaDirSuffix) +} + +// ShadowWALDir returns the path to the internal WAL directory. +func (db *DB) ShadowWALDir() string { return filepath.Join(db.MetaPath(), WALDirName) } -// InternalLogPath returns the path to the internal log directory. -func (db *DB) InternalLogPath() string { +// ActiveShadowWALPath returns the path to the internal active WAL file. +func (db *DB) ActiveShadowWALPath() string { + return filepath.Join(db.ShadowWALDir(), ActiveWALName) +} + +// LogPath returns the path to the internal log directory. +func (db *DB) LogPath() string { return filepath.Join(db.MetaPath(), LogFilename) } @@ -75,9 +90,9 @@ func (db *DB) Open() (err error) { defer db.mu.Unlock() // Ensure meta directory structure exists. - if err := os.MkdirAll(db.MetaPath(), 0600); err != nil { + if err := os.MkdirAll(db.MetaPath(), 0700); err != nil { return err - } else if err := os.MkdirAll(db.WALPath(), 0600); err != nil { + } else if err := os.MkdirAll(db.ShadowWALDir(), 0700); err != nil { return err } @@ -87,15 +102,171 @@ func (db *DB) Open() (err error) { } db.logger = log.New(db.logFile, "", log.LstdFlags) + db.logger.Printf("open: %s", db.path) + // If database file exists, read & set the header. - if err := db.readHeader(); err != nil { + if err := db.readHeader(); os.IsNotExist(err) { + db.setHeader(nil) // invalidate header for missing file + } else if err != nil { db.setHeader(nil) // invalidate header db.logger.Printf("cannot read db header: %s", err) } + // If WAL is enabled & WAL file exists, attempt to recover. + if db.isWALEnabled { + if err := db.recoverWAL(); err != nil { + return fmt.Errorf("recover wal: %w", err) + } + } + return nil } +func (db *DB) recoverWAL() error { + // Check for the existence of the real & shadow WAL. + // We need to sync up between the two. + hasShadowWAL, err := db.shadowWALExists() + if err != nil { + return fmt.Errorf("check shadow wal: %w", err) + } + hasRealWAL, err := db.walExists() + if err != nil { + return fmt.Errorf("check real wal: %w", err) + } + + // Neither the real WAL or shadow WAL exist so no pages have been written + // since the DB's journal mode has been set to "wal". In this case, do + // nothing and wait for the first WAL write to occur. + if !hasShadowWAL && !hasRealWAL { + return nil + } + + if hasRealWAL { + if hasShadowWAL { + return db.recoverRealAndShadowWALs() + } + return db.recoverRealWALOnly() + } + + if hasShadowWAL { + return db.recoverShadowWALOnly() + } + return nil // no-op, wait for first WAL write +} + +// recoverRealWALOnly copies the real WAL to the active shadow WAL. +func (db *DB) recoverRealWALOnly() error { + // Open real WAL to read from. + r, err := os.Open(db.WALPath()) + if err != nil { + return fmt.Errorf("cannot open wal: %w", err) + } + defer r.Close() + + // Read header from real WAL. + var hdr sqlite.WALHeader + if _, err := hdr.ReadFrom(r); os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + + // Create a new shadow WAL file. + w, err := db.createActiveShadowWAL(hdr) + if err != nil { + return fmt.Errorf("cannot create active shadow wal: %w", err) + } + defer w.Close() + + // Read from real WAL and copy to shadow WAL. + buf := make([]byte, hdr.PageSize) + for { + // Read frame header & data from real WAL. + var fhdr sqlite.WALFrameHeader + if _, err := fhdr.ReadFrom(r); err != nil { + return fmt.Errorf("cannot read frame header: %w", err) + } else if _, err := io.ReadFull(r, buf); err != nil { + return fmt.Errorf("cannot read frame: %w", err) + } + + // Copy to the shadow WAL. + if _, err := fhdr.WriteTo(w); err != nil { + return fmt.Errorf("cannot write frame to shadow: %w", err) + } else if _, err := w.Write(buf); err != nil { + return fmt.Errorf("cannot write frame to shadow: %w", err) + } + } + + return nil +} + +// walExists returns true if the real WAL exists. +func (db *DB) walExists() (bool, error) { + if _, err := os.Stat(db.WALPath()); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} + +// shadowWALExists returns true if the shadow WAL exists. +func (db *DB) shadowWALExists() (bool, error) { + f, err := os.Open(db.ShadowWALDir()) + if err != nil { + return false, err + } + + // Read directory entries until we find a WAL file. + for { + fis, err := f.Readdir(1) + if err == io.EOF { + return false, nil + } else if err != nil { + return false, err + } else if strings.HasSuffix(fis[0].Name(), WALExt) { + return true, nil + } + } +} + +// createActiveShadowWAL creates a new shadow WAL file with the given header. +func (db *DB) createActiveShadowWAL(hdr sqlite.WALHeader) (f *os.File, err error) { + if f, err = os.OpenFile(db.ActiveShadowWALPath(), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil { + return nil, err + } + + // Attempt to clean up if shadow WAL creation fails. + defer func() { + if err != nil { + f.Close() + os.Remove(f.Name()) + } + }() + + // Clear some fields from header that we won't use for the shadow WAL. + hdr = ClearShadowWALHeader(hdr) + + // Write header & save it to the DB to ensure new WAL header writes match. + if _, err := hdr.WriteTo(f); err != nil { + return nil, err + } + db.walHeader = &hdr + + return f, nil +} + +// recoverShadowWALOnly verifies the last page in the shadow WAL matches the +// contents of the database page. +func (db *DB) recoverShadowWALOnly() error { + panic("TODO") +} + +// recoverRealAndShadowWALs verifies the last page of the real & shadow WALs match. +func (db *DB) recoverRealAndShadowWALs() error { + panic("TODO") +} + // Close stops management of the database. func (db *DB) Close() (err error) { db.cancel() @@ -146,6 +317,14 @@ func (db *DB) SetHeader(page []byte) { func (db *DB) setHeader(page []byte) { db.isHeaderValid = sqlite.IsValidHeader(page) db.isWALEnabled = sqlite.IsWALEnabled(page) + + // Clear WAL data if WAL is disabled. + if !db.isWALEnabled { + db.walHeader = nil + db.processedWALByteN = 0 + db.pendingWALByteN = 0 + } + db.logger.Printf("header: valid=%v wal=%v", db.isHeaderValid, db.isWALEnabled) } func (db *DB) AddPendingWALByteN(n int64) { @@ -168,3 +347,12 @@ func IsConfigPath(path string) bool { func ConfigPathToDBPath(path string) string { return strings.TrimSuffix(path, ConfigSuffix) } + +// ClearShadowWALHeader clears the checkpoint, salt, & checksum in the header. +// These fields are unused by the shadow WAL because we don't overwrite the WAL. +func ClearShadowWALHeader(hdr sqlite.WALHeader) sqlite.WALHeader { + hdr.CheckpointSeqNo = 0 + hdr.Salt = 0 + hdr.Checksum = 0 + return hdr +} diff --git a/doc/NOTES.md b/doc/NOTES.md new file mode 100644 index 0000000..692231e --- /dev/null +++ b/doc/NOTES.md @@ -0,0 +1,32 @@ +NOTES +===== + +## RECOVERY + +### REAL WAL EXISTS, SHADOW EXISTS + +Scenario: Unclean close by application process. + +Action: Verify last page from both match. + + +### REAL WAL DOESN'T EXISTS, SHADOW EXISTS + +Scenario: Application closed cleanly & removed WAL. + +Action: Verify last page of shadow matches database page. + + +### REAL WAL EXISTS, SHADOW DOESN'T EXIST + +Scenario: Application wrote WAL; system crashed before shadow written/sync'd. + +Action: Start new generation. + + +### REAL WAL DOESN'T EXIST, SHADOW DOESN'T EXIST + +Scenario: No writes have occurred since the DB was switched to WAL mode. + +Action: Nothing to recover. Wait for first WAL write. + diff --git a/file_system.go b/file_system.go index a4d5b3d..41913c7 100644 --- a/file_system.go +++ b/file_system.go @@ -56,7 +56,7 @@ func (f *FileSystem) Open() error { // Initialize a DB object based on the config path. // The database doesn't need to exist. It will be tracked when created. - db := NewDB(rel) + db := NewDB(f, rel) if err := db.Open(); err != nil { log.Printf("cannot open db %q: %s", rel, err) return nil @@ -73,7 +73,9 @@ func (f *FileSystem) Open() error { func (f *FileSystem) DB(path string) *DB { f.mu.RLock() defer f.mu.RUnlock() - return f.dbs[path] + db := f.dbs[path] + println("dbg/fs.db?", path, db != nil) + return db } // OpenDB initializes a DB for a given path. @@ -84,7 +86,7 @@ func (f *FileSystem) OpenDB(path string) error { } func (f *FileSystem) openDB(path string) error { - db := NewDB(path) + db := NewDB(f, path) if err := db.Open(); err != nil { return err } diff --git a/handle.go b/handle.go index c713378..eaf1531 100644 --- a/handle.go +++ b/handle.go @@ -101,10 +101,15 @@ func (h *Handle) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error) } // Convert FileInfo objects to FUSE directory entries. - ents = make([]fuse.Dirent, len(fis)) - for i, fi := range fis { + ents = make([]fuse.Dirent, 0, len(fis)) + for _, fi := range fis { + // Skip any meta directories. + if IsMetaDir(fi.Name()) { + continue + } + statt := fi.Sys().(*syscall.Stat_t) - ents[i] = fuse.Dirent{Inode: statt.Ino, Name: fi.Name()} + ents = append(ents, fuse.Dirent{Inode: statt.Ino, Name: fi.Name()}) } sort.Slice(ents, func(i, j int) bool { return ents[i].Name < ents[j].Name }) diff --git a/litestream.go b/litestream.go index 0bc64b9..31b79ab 100644 --- a/litestream.go +++ b/litestream.go @@ -39,3 +39,9 @@ func trimPrefixN(s string, n int) string { } return s[n:] } + +func assert(condition bool, message string) { + if !condition { + panic("assertion failed: " + message) + } +} diff --git a/node.go b/node.go index 54d4e55..359512e 100644 --- a/node.go +++ b/node.go @@ -42,6 +42,8 @@ type Node struct { } func NewNode(fs *FileSystem, path string) *Node { + assert(fs != nil, "node file system required") + assert(path != "", "node path required") return &Node{fs: fs, path: path} } @@ -57,6 +59,7 @@ func (n *Node) srcpath() string { // DB returns the DB object associated with the node, if any. // If node points to a "-wal" file then the associated DB is returned. func (n *Node) DB() *DB { + println("dbg/node.db", n.path, n.fs != nil, n.path == "") if strings.HasPrefix(n.path, sqlite.WALSuffix) { return n.fs.DB(strings.TrimSuffix(n.path, sqlite.WALSuffix)) } @@ -100,6 +103,12 @@ func (n *Node) Attr(ctx context.Context, a *fuse.Attr) (err error) { // Lookup need not to handle the names "." and "..". func (n *Node) Lookup(ctx context.Context, name string) (_ fs.Node, err error) { path := filepath.Join(n.path, name) + + // Meta directories should not be visible. + if IsMetaDir(path) { + return nil, syscall.ENOENT + } + srcpath := filepath.Join(n.fs.TargetPath, path) if _, err := os.Stat(srcpath); os.IsNotExist(err) { return nil, syscall.ENOENT @@ -113,11 +122,17 @@ func (n *Node) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error) { return nil, err } - ents = make([]fuse.Dirent, len(fis)) - for i, fi := range fis { + ents = make([]fuse.Dirent, 0, len(fis)) + for _, fi := range fis { + // Skip any meta directories. + if IsMetaDir(fi.Name()) { + continue + } + statt := fi.Sys().(*syscall.Stat_t) - ents[i] = fuse.Dirent{Inode: statt.Ino, Name: fi.Name()} + ents = append(ents, fuse.Dirent{Inode: statt.Ino, Name: fi.Name()}) } + return ents, nil } @@ -237,10 +252,18 @@ func (n *Node) Link(ctx context.Context, req *fuse.LinkRequest, _old fs.Node) (_ // the receiver, which must be a directory. The entry to be removed // may correspond to a file (unlink) or to a directory (rmdir). func (n *Node) Remove(ctx context.Context, req *fuse.RemoveRequest) (err error) { - if req.Dir { - return syscall.Rmdir(filepath.Join(n.srcpath(), req.Name)) + path := filepath.Join(n.srcpath(), req.Name) + if IsMetaDir(path) { + return syscall.ENOENT } - return syscall.Unlink(filepath.Join(n.srcpath(), req.Name)) + + if req.Dir { + return syscall.Rmdir(path) + } + + // TODO: Clear db header. + + return syscall.Unlink(path) } // Access checks whether the calling context has permission for @@ -270,6 +293,7 @@ func (n *Node) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR if err != nil { return nil, err } + println("dbg/open") return NewHandle(n, f), nil } @@ -279,7 +303,8 @@ func (n *Node) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.C if err != nil { return nil, nil, err } - return NewNode(n.fs, filepath.Join(n.path, req.Name)), NewHandle(n, f), nil + nn := NewNode(n.fs, filepath.Join(n.path, req.Name)) + return nn, NewHandle(nn, f), nil } func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, _newDir fs.Node) (err error) { diff --git a/sqlite/sqlite.go b/sqlite/sqlite.go index c22c8a4..9d265d6 100644 --- a/sqlite/sqlite.go +++ b/sqlite/sqlite.go @@ -95,6 +95,26 @@ func (hdr WALHeader) ByteOrder() binary.ByteOrder { } } +// ReadFrom reads hdr from r. +func (hdr *WALHeader) ReadFrom(r io.Reader) (n int64, err error) { + b := make([]byte, WALHeaderSize) + nn, err := io.ReadFull(r, b) + if n = int64(nn); err != nil { + return n, err + } + return n, hdr.Unmarshal(b) +} + +// WriteTo writes hdr to r. +func (hdr *WALHeader) WriteTo(w io.Writer) (n int64, err error) { + b := make([]byte, WALHeaderSize) + if err := hdr.MarshalTo(b); err != nil { + return 0, err + } + nn, err := w.Write(b) + return int64(nn), err +} + // MarshalTo encodes the header to b. // Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. func (hdr *WALHeader) MarshalTo(b []byte) error { @@ -146,6 +166,26 @@ func (hdr *WALFrameHeader) IsCommit() bool { return hdr.PageN != 0 } +// ReadFrom reads hdr from r. +func (hdr *WALFrameHeader) ReadFrom(r io.Reader) (n int64, err error) { + b := make([]byte, WALFrameHeaderSize) + nn, err := io.ReadFull(r, b) + if n = int64(nn); err != nil { + return n, err + } + return n, hdr.Unmarshal(b) +} + +// WriteTo writes hdr to r. +func (hdr *WALFrameHeader) WriteTo(w io.Writer) (n int64, err error) { + b := make([]byte, WALFrameHeaderSize) + if err := hdr.MarshalTo(b); err != nil { + return 0, err + } + nn, err := w.Write(b) + return int64(nn), err +} + // MarshalTo encodes the frame header to b. // Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. func (hdr *WALFrameHeader) MarshalTo(b []byte) error {