Add recovery for 'real WAL only'.

This commit is contained in:
Ben Johnson
2020-11-06 16:30:37 -07:00
parent d964e4199a
commit 2941a2433f
7 changed files with 327 additions and 29 deletions

220
db.go
View File

@@ -2,6 +2,7 @@ package litestream
import ( import (
"context" "context"
"fmt"
"io" "io"
"log" "log"
"os" "os"
@@ -16,19 +17,23 @@ const (
MetaDirSuffix = "-litestream" MetaDirSuffix = "-litestream"
ConfigSuffix = ".litestream" ConfigSuffix = ".litestream"
WALDirName = "wal" WALDirName = "wal"
LogFilename = "log" WALExt = ".wal"
ActiveWALName = "active.wal"
LogFilename = "log"
) )
// DB represents an instance of a managed SQLite database in the file system. // DB represents an instance of a managed SQLite database in the file system.
type DB struct { type DB struct {
mu sync.Mutex mu sync.Mutex
fs *FileSystem
path string path string
isHeaderValid bool // true if meta page contains SQLITE3 header isHeaderValid bool // true if meta page contains SQLITE3 header
isWALEnabled bool // true if file format version specifies WAL isWALEnabled bool // true if file format version specifies WAL
// Tracks offset of WAL data. // Tracks WAL state.
walHeader *sqlite.WALHeader
processedWALByteN int64 // bytes copied to shadow WAL processedWALByteN int64 // bytes copied to shadow WAL
pendingWALByteN int64 // bytes pending copy to shadow WAL pendingWALByteN int64 // bytes pending copy to shadow WAL
@@ -42,8 +47,8 @@ type DB struct {
} }
// NewDB returns a new instance of DB for a given path. // NewDB returns a new instance of DB for a given path.
func NewDB(path string) *DB { func NewDB(fs *FileSystem, path string) *DB {
db := &DB{path: path} db := &DB{fs: fs, path: path}
db.ctx, db.cancel = context.WithCancel(context.Background()) db.ctx, db.cancel = context.WithCancel(context.Background())
return db return db
} }
@@ -53,19 +58,29 @@ func (db *DB) Path() string {
return db.path return db.path
} }
// InternalMetaPath returns the path to the database metadata. // WALPath returns the full path to the real WAL.
func (db *DB) InternalMetaPath() string { func (db *DB) WALPath() string {
dir, file := filepath.Split(db.path) return filepath.Join(db.fs.TargetPath, db.path+"-wal")
return filepath.Join(db.node.fs.TargetPath, dir, "."+file+MetaDirSuffix)
} }
// InternalWALPath returns the path to the internal WAL directory. // MetaPath returns the path to the database metadata.
func (db *DB) InternalWALPath() string { func (db *DB) MetaPath() string {
dir, file := filepath.Split(db.path)
return filepath.Join(db.fs.TargetPath, dir, "."+file+MetaDirSuffix)
}
// ShadowWALDir returns the path to the internal WAL directory.
func (db *DB) ShadowWALDir() string {
return filepath.Join(db.MetaPath(), WALDirName) return filepath.Join(db.MetaPath(), WALDirName)
} }
// InternalLogPath returns the path to the internal log directory. // ActiveShadowWALPath returns the path to the internal active WAL file.
func (db *DB) InternalLogPath() string { func (db *DB) ActiveShadowWALPath() string {
return filepath.Join(db.ShadowWALDir(), ActiveWALName)
}
// LogPath returns the path to the internal log directory.
func (db *DB) LogPath() string {
return filepath.Join(db.MetaPath(), LogFilename) return filepath.Join(db.MetaPath(), LogFilename)
} }
@@ -75,9 +90,9 @@ func (db *DB) Open() (err error) {
defer db.mu.Unlock() defer db.mu.Unlock()
// Ensure meta directory structure exists. // Ensure meta directory structure exists.
if err := os.MkdirAll(db.MetaPath(), 0600); err != nil { if err := os.MkdirAll(db.MetaPath(), 0700); err != nil {
return err return err
} else if err := os.MkdirAll(db.WALPath(), 0600); err != nil { } else if err := os.MkdirAll(db.ShadowWALDir(), 0700); err != nil {
return err return err
} }
@@ -87,15 +102,171 @@ func (db *DB) Open() (err error) {
} }
db.logger = log.New(db.logFile, "", log.LstdFlags) db.logger = log.New(db.logFile, "", log.LstdFlags)
db.logger.Printf("open: %s", db.path)
// If database file exists, read & set the header. // If database file exists, read & set the header.
if err := db.readHeader(); err != nil { if err := db.readHeader(); os.IsNotExist(err) {
db.setHeader(nil) // invalidate header for missing file
} else if err != nil {
db.setHeader(nil) // invalidate header db.setHeader(nil) // invalidate header
db.logger.Printf("cannot read db header: %s", err) db.logger.Printf("cannot read db header: %s", err)
} }
// If WAL is enabled & WAL file exists, attempt to recover.
if db.isWALEnabled {
if err := db.recoverWAL(); err != nil {
return fmt.Errorf("recover wal: %w", err)
}
}
return nil return nil
} }
func (db *DB) recoverWAL() error {
// Check for the existence of the real & shadow WAL.
// We need to sync up between the two.
hasShadowWAL, err := db.shadowWALExists()
if err != nil {
return fmt.Errorf("check shadow wal: %w", err)
}
hasRealWAL, err := db.walExists()
if err != nil {
return fmt.Errorf("check real wal: %w", err)
}
// Neither the real WAL or shadow WAL exist so no pages have been written
// since the DB's journal mode has been set to "wal". In this case, do
// nothing and wait for the first WAL write to occur.
if !hasShadowWAL && !hasRealWAL {
return nil
}
if hasRealWAL {
if hasShadowWAL {
return db.recoverRealAndShadowWALs()
}
return db.recoverRealWALOnly()
}
if hasShadowWAL {
return db.recoverShadowWALOnly()
}
return nil // no-op, wait for first WAL write
}
// recoverRealWALOnly copies the real WAL to the active shadow WAL.
func (db *DB) recoverRealWALOnly() error {
// Open real WAL to read from.
r, err := os.Open(db.WALPath())
if err != nil {
return fmt.Errorf("cannot open wal: %w", err)
}
defer r.Close()
// Read header from real WAL.
var hdr sqlite.WALHeader
if _, err := hdr.ReadFrom(r); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
// Create a new shadow WAL file.
w, err := db.createActiveShadowWAL(hdr)
if err != nil {
return fmt.Errorf("cannot create active shadow wal: %w", err)
}
defer w.Close()
// Read from real WAL and copy to shadow WAL.
buf := make([]byte, hdr.PageSize)
for {
// Read frame header & data from real WAL.
var fhdr sqlite.WALFrameHeader
if _, err := fhdr.ReadFrom(r); err != nil {
return fmt.Errorf("cannot read frame header: %w", err)
} else if _, err := io.ReadFull(r, buf); err != nil {
return fmt.Errorf("cannot read frame: %w", err)
}
// Copy to the shadow WAL.
if _, err := fhdr.WriteTo(w); err != nil {
return fmt.Errorf("cannot write frame to shadow: %w", err)
} else if _, err := w.Write(buf); err != nil {
return fmt.Errorf("cannot write frame to shadow: %w", err)
}
}
return nil
}
// walExists returns true if the real WAL exists.
func (db *DB) walExists() (bool, error) {
if _, err := os.Stat(db.WALPath()); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
// shadowWALExists returns true if the shadow WAL exists.
func (db *DB) shadowWALExists() (bool, error) {
f, err := os.Open(db.ShadowWALDir())
if err != nil {
return false, err
}
// Read directory entries until we find a WAL file.
for {
fis, err := f.Readdir(1)
if err == io.EOF {
return false, nil
} else if err != nil {
return false, err
} else if strings.HasSuffix(fis[0].Name(), WALExt) {
return true, nil
}
}
}
// createActiveShadowWAL creates a new shadow WAL file with the given header.
func (db *DB) createActiveShadowWAL(hdr sqlite.WALHeader) (f *os.File, err error) {
if f, err = os.OpenFile(db.ActiveShadowWALPath(), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
return nil, err
}
// Attempt to clean up if shadow WAL creation fails.
defer func() {
if err != nil {
f.Close()
os.Remove(f.Name())
}
}()
// Clear some fields from header that we won't use for the shadow WAL.
hdr = ClearShadowWALHeader(hdr)
// Write header & save it to the DB to ensure new WAL header writes match.
if _, err := hdr.WriteTo(f); err != nil {
return nil, err
}
db.walHeader = &hdr
return f, nil
}
// recoverShadowWALOnly verifies the last page in the shadow WAL matches the
// contents of the database page.
func (db *DB) recoverShadowWALOnly() error {
panic("TODO")
}
// recoverRealAndShadowWALs verifies the last page of the real & shadow WALs match.
func (db *DB) recoverRealAndShadowWALs() error {
panic("TODO")
}
// Close stops management of the database. // Close stops management of the database.
func (db *DB) Close() (err error) { func (db *DB) Close() (err error) {
db.cancel() db.cancel()
@@ -146,6 +317,14 @@ func (db *DB) SetHeader(page []byte) {
func (db *DB) setHeader(page []byte) { func (db *DB) setHeader(page []byte) {
db.isHeaderValid = sqlite.IsValidHeader(page) db.isHeaderValid = sqlite.IsValidHeader(page)
db.isWALEnabled = sqlite.IsWALEnabled(page) db.isWALEnabled = sqlite.IsWALEnabled(page)
// Clear WAL data if WAL is disabled.
if !db.isWALEnabled {
db.walHeader = nil
db.processedWALByteN = 0
db.pendingWALByteN = 0
}
db.logger.Printf("header: valid=%v wal=%v", db.isHeaderValid, db.isWALEnabled)
} }
func (db *DB) AddPendingWALByteN(n int64) { func (db *DB) AddPendingWALByteN(n int64) {
@@ -168,3 +347,12 @@ func IsConfigPath(path string) bool {
func ConfigPathToDBPath(path string) string { func ConfigPathToDBPath(path string) string {
return strings.TrimSuffix(path, ConfigSuffix) return strings.TrimSuffix(path, ConfigSuffix)
} }
// ClearShadowWALHeader clears the checkpoint, salt, & checksum in the header.
// These fields are unused by the shadow WAL because we don't overwrite the WAL.
func ClearShadowWALHeader(hdr sqlite.WALHeader) sqlite.WALHeader {
hdr.CheckpointSeqNo = 0
hdr.Salt = 0
hdr.Checksum = 0
return hdr
}

32
doc/NOTES.md Normal file
View File

@@ -0,0 +1,32 @@
NOTES
=====
## RECOVERY
### REAL WAL EXISTS, SHADOW EXISTS
Scenario: Unclean close by application process.
Action: Verify last page from both match.
### REAL WAL DOESN'T EXISTS, SHADOW EXISTS
Scenario: Application closed cleanly & removed WAL.
Action: Verify last page of shadow matches database page.
### REAL WAL EXISTS, SHADOW DOESN'T EXIST
Scenario: Application wrote WAL; system crashed before shadow written/sync'd.
Action: Start new generation.
### REAL WAL DOESN'T EXIST, SHADOW DOESN'T EXIST
Scenario: No writes have occurred since the DB was switched to WAL mode.
Action: Nothing to recover. Wait for first WAL write.

View File

@@ -56,7 +56,7 @@ func (f *FileSystem) Open() error {
// Initialize a DB object based on the config path. // Initialize a DB object based on the config path.
// The database doesn't need to exist. It will be tracked when created. // The database doesn't need to exist. It will be tracked when created.
db := NewDB(rel) db := NewDB(f, rel)
if err := db.Open(); err != nil { if err := db.Open(); err != nil {
log.Printf("cannot open db %q: %s", rel, err) log.Printf("cannot open db %q: %s", rel, err)
return nil return nil
@@ -73,7 +73,9 @@ func (f *FileSystem) Open() error {
func (f *FileSystem) DB(path string) *DB { func (f *FileSystem) DB(path string) *DB {
f.mu.RLock() f.mu.RLock()
defer f.mu.RUnlock() defer f.mu.RUnlock()
return f.dbs[path] db := f.dbs[path]
println("dbg/fs.db?", path, db != nil)
return db
} }
// OpenDB initializes a DB for a given path. // OpenDB initializes a DB for a given path.
@@ -84,7 +86,7 @@ func (f *FileSystem) OpenDB(path string) error {
} }
func (f *FileSystem) openDB(path string) error { func (f *FileSystem) openDB(path string) error {
db := NewDB(path) db := NewDB(f, path)
if err := db.Open(); err != nil { if err := db.Open(); err != nil {
return err return err
} }

View File

@@ -101,10 +101,15 @@ func (h *Handle) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error)
} }
// Convert FileInfo objects to FUSE directory entries. // Convert FileInfo objects to FUSE directory entries.
ents = make([]fuse.Dirent, len(fis)) ents = make([]fuse.Dirent, 0, len(fis))
for i, fi := range fis { for _, fi := range fis {
// Skip any meta directories.
if IsMetaDir(fi.Name()) {
continue
}
statt := fi.Sys().(*syscall.Stat_t) statt := fi.Sys().(*syscall.Stat_t)
ents[i] = fuse.Dirent{Inode: statt.Ino, Name: fi.Name()} ents = append(ents, fuse.Dirent{Inode: statt.Ino, Name: fi.Name()})
} }
sort.Slice(ents, func(i, j int) bool { return ents[i].Name < ents[j].Name }) sort.Slice(ents, func(i, j int) bool { return ents[i].Name < ents[j].Name })

View File

@@ -39,3 +39,9 @@ func trimPrefixN(s string, n int) string {
} }
return s[n:] return s[n:]
} }
func assert(condition bool, message string) {
if !condition {
panic("assertion failed: " + message)
}
}

39
node.go
View File

@@ -42,6 +42,8 @@ type Node struct {
} }
func NewNode(fs *FileSystem, path string) *Node { func NewNode(fs *FileSystem, path string) *Node {
assert(fs != nil, "node file system required")
assert(path != "", "node path required")
return &Node{fs: fs, path: path} return &Node{fs: fs, path: path}
} }
@@ -57,6 +59,7 @@ func (n *Node) srcpath() string {
// DB returns the DB object associated with the node, if any. // DB returns the DB object associated with the node, if any.
// If node points to a "-wal" file then the associated DB is returned. // If node points to a "-wal" file then the associated DB is returned.
func (n *Node) DB() *DB { func (n *Node) DB() *DB {
println("dbg/node.db", n.path, n.fs != nil, n.path == "")
if strings.HasPrefix(n.path, sqlite.WALSuffix) { if strings.HasPrefix(n.path, sqlite.WALSuffix) {
return n.fs.DB(strings.TrimSuffix(n.path, sqlite.WALSuffix)) return n.fs.DB(strings.TrimSuffix(n.path, sqlite.WALSuffix))
} }
@@ -100,6 +103,12 @@ func (n *Node) Attr(ctx context.Context, a *fuse.Attr) (err error) {
// Lookup need not to handle the names "." and "..". // Lookup need not to handle the names "." and "..".
func (n *Node) Lookup(ctx context.Context, name string) (_ fs.Node, err error) { func (n *Node) Lookup(ctx context.Context, name string) (_ fs.Node, err error) {
path := filepath.Join(n.path, name) path := filepath.Join(n.path, name)
// Meta directories should not be visible.
if IsMetaDir(path) {
return nil, syscall.ENOENT
}
srcpath := filepath.Join(n.fs.TargetPath, path) srcpath := filepath.Join(n.fs.TargetPath, path)
if _, err := os.Stat(srcpath); os.IsNotExist(err) { if _, err := os.Stat(srcpath); os.IsNotExist(err) {
return nil, syscall.ENOENT return nil, syscall.ENOENT
@@ -113,11 +122,17 @@ func (n *Node) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error) {
return nil, err return nil, err
} }
ents = make([]fuse.Dirent, len(fis)) ents = make([]fuse.Dirent, 0, len(fis))
for i, fi := range fis { for _, fi := range fis {
// Skip any meta directories.
if IsMetaDir(fi.Name()) {
continue
}
statt := fi.Sys().(*syscall.Stat_t) statt := fi.Sys().(*syscall.Stat_t)
ents[i] = fuse.Dirent{Inode: statt.Ino, Name: fi.Name()} ents = append(ents, fuse.Dirent{Inode: statt.Ino, Name: fi.Name()})
} }
return ents, nil return ents, nil
} }
@@ -237,10 +252,18 @@ func (n *Node) Link(ctx context.Context, req *fuse.LinkRequest, _old fs.Node) (_
// the receiver, which must be a directory. The entry to be removed // the receiver, which must be a directory. The entry to be removed
// may correspond to a file (unlink) or to a directory (rmdir). // may correspond to a file (unlink) or to a directory (rmdir).
func (n *Node) Remove(ctx context.Context, req *fuse.RemoveRequest) (err error) { func (n *Node) Remove(ctx context.Context, req *fuse.RemoveRequest) (err error) {
if req.Dir { path := filepath.Join(n.srcpath(), req.Name)
return syscall.Rmdir(filepath.Join(n.srcpath(), req.Name)) if IsMetaDir(path) {
return syscall.ENOENT
} }
return syscall.Unlink(filepath.Join(n.srcpath(), req.Name))
if req.Dir {
return syscall.Rmdir(path)
}
// TODO: Clear db header.
return syscall.Unlink(path)
} }
// Access checks whether the calling context has permission for // Access checks whether the calling context has permission for
@@ -270,6 +293,7 @@ func (n *Node) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR
if err != nil { if err != nil {
return nil, err return nil, err
} }
println("dbg/open")
return NewHandle(n, f), nil return NewHandle(n, f), nil
} }
@@ -279,7 +303,8 @@ func (n *Node) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.C
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return NewNode(n.fs, filepath.Join(n.path, req.Name)), NewHandle(n, f), nil nn := NewNode(n.fs, filepath.Join(n.path, req.Name))
return nn, NewHandle(nn, f), nil
} }
func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, _newDir fs.Node) (err error) { func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, _newDir fs.Node) (err error) {

View File

@@ -95,6 +95,26 @@ func (hdr WALHeader) ByteOrder() binary.ByteOrder {
} }
} }
// ReadFrom reads hdr from r.
func (hdr *WALHeader) ReadFrom(r io.Reader) (n int64, err error) {
b := make([]byte, WALHeaderSize)
nn, err := io.ReadFull(r, b)
if n = int64(nn); err != nil {
return n, err
}
return n, hdr.Unmarshal(b)
}
// WriteTo writes hdr to r.
func (hdr *WALHeader) WriteTo(w io.Writer) (n int64, err error) {
b := make([]byte, WALHeaderSize)
if err := hdr.MarshalTo(b); err != nil {
return 0, err
}
nn, err := w.Write(b)
return int64(nn), err
}
// MarshalTo encodes the header to b. // MarshalTo encodes the header to b.
// Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. // Returns io.ErrShortWrite if len(b) is less than WALHeaderSize.
func (hdr *WALHeader) MarshalTo(b []byte) error { func (hdr *WALHeader) MarshalTo(b []byte) error {
@@ -146,6 +166,26 @@ func (hdr *WALFrameHeader) IsCommit() bool {
return hdr.PageN != 0 return hdr.PageN != 0
} }
// ReadFrom reads hdr from r.
func (hdr *WALFrameHeader) ReadFrom(r io.Reader) (n int64, err error) {
b := make([]byte, WALFrameHeaderSize)
nn, err := io.ReadFull(r, b)
if n = int64(nn); err != nil {
return n, err
}
return n, hdr.Unmarshal(b)
}
// WriteTo writes hdr to r.
func (hdr *WALFrameHeader) WriteTo(w io.Writer) (n int64, err error) {
b := make([]byte, WALFrameHeaderSize)
if err := hdr.MarshalTo(b); err != nil {
return 0, err
}
nn, err := w.Write(b)
return int64(nn), err
}
// MarshalTo encodes the frame header to b. // MarshalTo encodes the frame header to b.
// Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. // Returns io.ErrShortWrite if len(b) is less than WALHeaderSize.
func (hdr *WALFrameHeader) MarshalTo(b []byte) error { func (hdr *WALFrameHeader) MarshalTo(b []byte) error {