From d964e4199a65b907feb350163fed8f193cda1015 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 6 Nov 2020 12:33:01 -0700 Subject: [PATCH] Validate sqlite header --- db.go | 105 +++++++++++++++++++++--- doc/DESIGN.md | 45 ++++++----- file_system.go | 7 ++ handle.go | 115 +++++++++------------------ litestream.go | 38 --------- node.go | 22 ++++- wal.go => sqlite/sqlite.go | 48 ++++++++++- wal_test.go => sqlite/sqlite_test.go | 4 +- 8 files changed, 235 insertions(+), 149 deletions(-) rename wal.go => sqlite/sqlite.go (73%) rename wal_test.go => sqlite/sqlite_test.go (97%) diff --git a/db.go b/db.go index 927f73f..a598353 100644 --- a/db.go +++ b/db.go @@ -2,10 +2,14 @@ package litestream import ( "context" + "io" + "log" "os" "path/filepath" "strings" "sync" + + "github.com/benbjohnson/litestream/sqlite" ) const ( @@ -18,12 +22,23 @@ const ( // DB represents an instance of a managed SQLite database in the file system. type DB struct { + mu sync.Mutex path string - inTx bool // currently in transaction + + isHeaderValid bool // true if meta page contains SQLITE3 header + isWALEnabled bool // true if file format version specifies WAL + + // Tracks offset of WAL data. + processedWALByteN int64 // bytes copied to shadow WAL + pendingWALByteN int64 // bytes pending copy to shadow WAL ctx context.Context cancel func() wg sync.WaitGroup + + // Database-specific logger + logFile *os.File + logger *log.Logger } // NewDB returns a new instance of DB for a given path. @@ -38,39 +53,107 @@ func (db *DB) Path() string { return db.path } -// MetaPath returns the path to the database metadata. -func (db *DB) MetaPath() string { +// InternalMetaPath returns the path to the database metadata. +func (db *DB) InternalMetaPath() string { dir, file := filepath.Split(db.path) - return filepath.Join(dir, "."+file+MetaDirSuffix) + return filepath.Join(db.node.fs.TargetPath, dir, "."+file+MetaDirSuffix) } -// WALPath returns the path to the internal WAL directory. -func (db *DB) WALPath() string { +// InternalWALPath returns the path to the internal WAL directory. +func (db *DB) InternalWALPath() string { return filepath.Join(db.MetaPath(), WALDirName) } -// LogPath returns the path to the internal log directory. -func (db *DB) LogPath() string { +// InternalLogPath returns the path to the internal log directory. +func (db *DB) InternalLogPath() string { return filepath.Join(db.MetaPath(), LogFilename) } // Open loads the configuration file -func (db *DB) Open() error { - // Ensure meta directory exists. +func (db *DB) Open() (err error) { + db.mu.Lock() + defer db.mu.Unlock() + + // Ensure meta directory structure exists. if err := os.MkdirAll(db.MetaPath(), 0600); err != nil { return err + } else if err := os.MkdirAll(db.WALPath(), 0600); err != nil { + return err + } + + // Initialize per-db logger. + if db.logFile, err = os.OpenFile(db.LogPath(), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600); err != nil { + return err + } + db.logger = log.New(db.logFile, "", log.LstdFlags) + + // If database file exists, read & set the header. + if err := db.readHeader(); err != nil { + db.setHeader(nil) // invalidate header + db.logger.Printf("cannot read db header: %s", err) } return nil } // Close stops management of the database. -func (db *DB) Close() error { +func (db *DB) Close() (err error) { db.cancel() db.wg.Wait() + + // Close per-db log file. + if e := db.logFile.Close(); e != nil && err == nil { + err = e + } + return err +} + +// readHeader reads the SQLite header and sets the initial DB flags. +func (db *DB) readHeader() error { + f, err := os.Open(db.path) + if err != nil { + return err + } + defer f.Close() + + hdr := make([]byte, sqlite.HeaderSize) + if _, err := io.ReadFull(f, hdr); err != nil { + return err + } + + db.setHeader(hdr) return nil } +// Valid returns true if there is a valid, WAL-enabled SQLite database on-disk. +func (db *DB) Valid() bool { + db.mu.Lock() + defer db.mu.Unlock() + return db.valid() +} + +func (db *DB) valid() bool { + return db.isHeaderValid && db.isWALEnabled +} + +// SetHeader checks if the page has a valid header & uses a WAL. +func (db *DB) SetHeader(page []byte) { + db.mu.Lock() + defer db.mu.Unlock() + db.setHeader(page) +} + +func (db *DB) setHeader(page []byte) { + db.isHeaderValid = sqlite.IsValidHeader(page) + db.isWALEnabled = sqlite.IsWALEnabled(page) +} + +func (db *DB) AddPendingWALByteN(n int64) { + db.mu.Lock() + defer db.mu.Unlock() + db.pendingWALByteN += n +} + // IsMetaDir returns true if base in path is hidden and ends in "-litestream". func IsMetaDir(path string) bool { base := filepath.Base(path) diff --git a/doc/DESIGN.md b/doc/DESIGN.md index e8ab835..d7070c8 100644 --- a/doc/DESIGN.md +++ b/doc/DESIGN.md @@ -11,31 +11,40 @@ to construct a persistent write-ahead log that can be replicated. ``` dir/ - db # SQLite database - db-wal # SQLite WAL - db.litestream # per-db configuration + db # SQLite database + db-wal # SQLite WAL + db.litestream # per-db configuration .db-litestream/ - log # recent event log - stat # per-db Prometheus statistics - snapshot # stores snapshot number (e.g. 0000000000000001) - wal/ # each WAL file contains pages in flush interval - active # active WAL file exists until flush; renamed - 0000000000000001.wal.gz # flushed, compressed WAL files - 0000000000000002.wal.gz + log # recent event log + stat # per-db Prometheus statistics + generation # current generation number + wal/ # each WAL file contains pages in flush interval + active.wal # active WAL file exists until flush; renamed + 000000000000001.wal.gz # flushed, compressed WAL files + 000000000000002.wal.gz ``` ### Remote (S3) ``` bkt/ - db/ # database path - 0000000000000001/ # snapshot directory - snapshot # full db snapshot - 0000000000000001.wal.gz # compressed WAL file - 0000000000000002.wal.gz - 0000000000000002/ - snapshot - 0000000000000001-0000000000000003.tar.gz + db/ # database path + 00000001/ # snapshot directory + snapshot # full db snapshot + 000000000000001.wal.gz # compressed WAL file + 000000000000002.wal.gz + 00000002/ + snapshot/ + 000000000000000.snapshot + scheduled/ + daily/ + 20000101T000000Z-000000000000023.snapshot + 20000102T000000Z-000000000000036.snapshot + monthly/ + 20000101T000000Z-000000000000023.snapshot + + wal/ + 000000000000001.wal.gz ``` diff --git a/file_system.go b/file_system.go index 7882d1a..a4d5b3d 100644 --- a/file_system.go +++ b/file_system.go @@ -69,6 +69,13 @@ func (f *FileSystem) Open() error { }) } +// DB returns the DB object associated with path. +func (f *FileSystem) DB(path string) *DB { + f.mu.RLock() + defer f.mu.RUnlock() + return f.dbs[path] +} + // OpenDB initializes a DB for a given path. func (f *FileSystem) OpenDB(path string) error { f.mu.Lock() diff --git a/handle.go b/handle.go index 13d1ef9..c713378 100644 --- a/handle.go +++ b/handle.go @@ -10,6 +10,7 @@ import ( "bazil.org/fuse" "bazil.org/fuse/fs" + "github.com/benbjohnson/litestream/sqlite" ) var _ fs.HandleFlusher = (*Handle)(nil) @@ -25,7 +26,13 @@ var _ fs.HandleWriter = (*Handle)(nil) // Handle represents a FUSE file handle. type Handle struct { - f *os.File + node *Node + f *os.File +} + +// NewHandle returns a new instance of Handle. +func NewHandle(n *Node, f *os.File) *Handle { + return &Handle{node: n, f: f} } // Release closes the underlying file descriptor. @@ -49,8 +56,36 @@ func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.W log.Printf("write: name=%s offset=%d n=%d", h.f.Name(), req.Offset, len(req.Data)) println(HexDump(req.Data)) - resp.Size, err = h.f.WriteAt(req.Data, req.Offset) - return err + if resp.Size, err = h.f.WriteAt(req.Data, req.Offset); err != nil { + // TODO: Invalidate node DB state. + return err + } + + // Check if handle reference a managed database. + db := h.node.DB() + if db == nil { + return nil + } + + // If this is the DB file, update the DB state based on the header. + if !sqlite.IsWALPath(h.node.Path()) { + // TODO: Header write could theoretically occur anywhere in first 100 bytes. + // If updating the header page, first validate it. + if req.Offset == 0 { + db.SetHeader(req.Data) + } + return nil + } + + // Ignore if the DB is not in a valid state (header + wal enabled). + if !db.Valid() { + return nil + } + + // Otherwise this is the WAL file so we should append the WAL data. + db.AddPendingWALByteN(int64(len(req.Data))) + + return nil } // Flush is called when a file handle is synced to disk. Implements fs.HandleFlusher. @@ -75,77 +110,3 @@ func (h *Handle) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error) sort.Slice(ents, func(i, j int) bool { return ents[i].Name < ents[j].Name }) return ents, nil } - -/* -// Lock tries to acquire a lock on a byte range of the node. If a -// conflicting lock is already held, returns syscall.EAGAIN. -func (h *Handle) Lock(ctx context.Context, req *fuse.LockRequest) error { - log.Printf("dbg/lock %p %s -- %#v", h, h.f.Name(), req.Lock) - if err := syscall.FcntlFlock(h.f.Fd(), unix.F_OFD_SETLK, &syscall.Flock_t{ - Type: int16(req.Lock.Type), - Whence: io.SeekStart, - Start: int64(req.Lock.Start), - Len: int64(req.Lock.End) - int64(req.Lock.Start), - }); err != nil { - return err - } - - return nil -} - -// LockWait acquires a lock on a byte range of the node, waiting -// until the lock can be obtained (or context is canceled). -func (h *Handle) LockWait(ctx context.Context, req *fuse.LockWaitRequest) error { - log.Printf("dbg/lockwait %p %s -- %#v", h, h.f.Name(), req.Lock) - return syscall.FcntlFlock(h.f.Fd(), unix.F_OFD_SETLKW, &syscall.Flock_t{ - Type: int16(req.Lock.Type), - Whence: io.SeekStart, - Start: int64(req.Lock.Start), - Len: int64(req.Lock.End) - int64(req.Lock.Start), - }) -} - -// Unlock releases the lock on a byte range of the node. Locks can -// be released also implicitly, see HandleFlockLocker and -// HandlePOSIXLocker. -func (h *Handle) Unlock(ctx context.Context, req *fuse.UnlockRequest) error { - log.Printf("dbg/unlk %p %s -- %#v", h, h.f.Name(), req.Lock) - return syscall.FcntlFlock(h.f.Fd(), unix.F_OFD_SETLK, &syscall.Flock_t{ - Type: int16(req.Lock.Type), - Whence: io.SeekStart, - Start: int64(req.Lock.Start), - Len: int64(req.Lock.End) - int64(req.Lock.Start), - }) - -} - -// QueryLock returns the current state of locks held for the byte -// range of the node. -// -// See QueryLockRequest for details on how to respond. -// -// To simplify implementing this method, resp.Lock is prefilled to -// have Lock.Type F_UNLCK, and the whole struct should be -// overwritten for in case of conflicting locks. -func (h *Handle) QueryLock(ctx context.Context, req *fuse.QueryLockRequest, resp *fuse.QueryLockResponse) error { - log.Printf("dbg/querylock %p %s -- %#v", h, h.f.Name(), req.Lock) - - flock_t := syscall.Flock_t{ - Type: int16(req.Lock.Type), - Whence: io.SeekStart, - Start: int64(req.Lock.Start), - Len: int64(req.Lock.End) - int64(req.Lock.Start), - } - if err := syscall.FcntlFlock(h.f.Fd(), unix.F_OFD_GETLK, &flock_t); err != nil { - return err - } - - resp.Lock = fuse.FileLock{ - Type: fuse.LockType(flock_t.Type), - Start: uint64(flock_t.Start), - End: uint64(flock_t.Start + flock_t.Len), - PID: flock_t.Pid, - } - return nil -} -*/ diff --git a/litestream.go b/litestream.go index 9cfb3ef..0bc64b9 100644 --- a/litestream.go +++ b/litestream.go @@ -1,48 +1,10 @@ package litestream import ( - "encoding/binary" "encoding/hex" - "io" "strings" ) -// Magic number specified at the beginning of WAL files. -const ( - MagicLittleEndian = 0x377f0682 - MagicBigEndian = 0x377f0683 -) - -const ( - WriteVersionOffset = 18 - ReadVersionOffset = 19 -) - -// ReadVersion returns the SQLite write & read version. -// Returns 1 for legacy & 2 for WAL. -func ReadVersion(b []byte) (writeVersion, readVersion uint8, err error) { - if len(b) < ReadVersionOffset { - return 0, 0, io.ErrUnexpectedEOF - } - return b[WriteVersionOffset], b[ReadVersionOffset], nil -} - -// Checksum computes a running checksum over a byte slice. -func Checksum(bo binary.ByteOrder, s uint64, b []byte) (_ uint64, err error) { - // Ensure byte slice length is divisible by 8. - if len(b)%8 != 0 { - return 0, ErrChecksumMisaligned - } - - // Iterate over 8-byte units and compute checksum. - s0, s1 := uint32(s>>32), uint32(s&0xFFFFFFFF) - for i := 0; i < len(b); i += 8 { - s0 += bo.Uint32(b[i:]) + s1 - s1 += bo.Uint32(b[i+4:]) + s0 - } - return uint64(s0)<<32 | uint64(s1), nil -} - // HexDump returns hexdump output but with duplicate lines removed. func HexDump(b []byte) string { const prefixN = len("00000000") diff --git a/node.go b/node.go index 75a39aa..54d4e55 100644 --- a/node.go +++ b/node.go @@ -5,11 +5,14 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" + "sync" "syscall" "time" "bazil.org/fuse" "bazil.org/fuse/fs" + "github.com/benbjohnson/litestream/sqlite" ) var _ fs.Node = (*Node)(nil) @@ -33,6 +36,7 @@ var _ fs.NodeSymlinker = (*Node)(nil) // Node represents a file or directory in the file system. type Node struct { + mu sync.RWMutex fs *FileSystem // base filesystem path string // path within file system } @@ -41,10 +45,24 @@ func NewNode(fs *FileSystem, path string) *Node { return &Node{fs: fs, path: path} } +// Path returns the path the node was initialized with. +func (n *Node) Path() string { + return n.path +} + func (n *Node) srcpath() string { return filepath.Join(n.fs.TargetPath, n.path) } +// DB returns the DB object associated with the node, if any. +// If node points to a "-wal" file then the associated DB is returned. +func (n *Node) DB() *DB { + if strings.HasPrefix(n.path, sqlite.WALSuffix) { + return n.fs.DB(strings.TrimSuffix(n.path, sqlite.WALSuffix)) + } + return n.fs.DB(n.path) +} + func (n *Node) Attr(ctx context.Context, a *fuse.Attr) (err error) { fi, err := os.Stat(n.srcpath()) if err != nil { @@ -252,7 +270,7 @@ func (n *Node) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR if err != nil { return nil, err } - return &Handle{f: f}, nil + return NewHandle(n, f), nil } // Create creates a new directory entry in the receiver, which must be a directory. @@ -261,7 +279,7 @@ func (n *Node) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.C if err != nil { return nil, nil, err } - return NewNode(n.fs, filepath.Join(n.path, req.Name)), &Handle{f: f}, nil + return NewNode(n.fs, filepath.Join(n.path, req.Name)), NewHandle(n, f), nil } func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, _newDir fs.Node) (err error) { diff --git a/wal.go b/sqlite/sqlite.go similarity index 73% rename from wal.go rename to sqlite/sqlite.go index b24d367..c22c8a4 100644 --- a/wal.go +++ b/sqlite/sqlite.go @@ -1,9 +1,11 @@ -package litestream +package sqlite import ( + "bytes" "encoding/binary" "errors" "io" + "strings" ) // TODO: Pages can be written multiple times before 3.11.0 (https://sqlite.org/releaselog/3_11_0.html) @@ -20,6 +22,50 @@ var ( ErrChecksumMisaligned = errors.New("checksum input misaligned") ) +// HeaderSize is the size of a SQLite 3 database header, in bytes. +const HeaderSize = 100 + +// WALSuffix is the suffix appended to the end of SQLite WAL path names. +const WALSuffix = "-wal" + +// Magic number specified at the beginning of WAL files. +const ( + MagicLittleEndian = 0x377f0682 + MagicBigEndian = 0x377f0683 +) + +// IsWALPath returns true if path ends with WALSuffix. +func IsWALPath(path string) bool { + return strings.HasSuffix(path, WALSuffix) +} + +// IsValidHeader returns true if page contains the standard SQLITE3 header. +func IsValidHeader(page []byte) bool { + return bytes.HasPrefix(page, []byte("SQLite format 3\x00")) +} + +// IsWALEnabled returns true if header page has the file format read & write +// version set to 2 (which indicates WAL). +func IsWALEnabled(page []byte) bool { + return len(page) >= 19 && page[18] == 2 && page[19] == 2 +} + +// Checksum computes a running checksum over a byte slice. +func Checksum(bo binary.ByteOrder, s uint64, b []byte) (_ uint64, err error) { + // Ensure byte slice length is divisible by 8. + if len(b)%8 != 0 { + return 0, ErrChecksumMisaligned + } + + // Iterate over 8-byte units and compute checksum. + s0, s1 := uint32(s>>32), uint32(s&0xFFFFFFFF) + for i := 0; i < len(b); i += 8 { + s0 += bo.Uint32(b[i:]) + s1 + s1 += bo.Uint32(b[i+4:]) + s0 + } + return uint64(s0)<<32 | uint64(s1), nil +} + // WALHeaderSize is the size of the WAL header, in bytes. const WALHeaderSize = 32 diff --git a/wal_test.go b/sqlite/sqlite_test.go similarity index 97% rename from wal_test.go rename to sqlite/sqlite_test.go index af491de..e875114 100644 --- a/wal_test.go +++ b/sqlite/sqlite_test.go @@ -1,11 +1,11 @@ -package litestream_test +package sqlite_test import ( "io" "path/filepath" "testing" - "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/sqlite" ) func TestWALHeader_MarshalTo(t *testing.T) {