From dc3da53c76301d349e9ddfd8c1ca397d473c38c9 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 22 Oct 2020 14:53:14 -0600 Subject: [PATCH] Add WALHeader & WALFrameHeader --- db.go | 84 ++++++++++++++++++++++++++++++++++ file_system.go | 112 +++++++++++++++++++++++++++++++++++++++++++++ go.mod | 3 +- go.sum | 3 ++ handle.go | 5 ++ litestream.go | 19 ++++++++ wal.go | 121 +++++++++++++++++++++++++++++++++++++++++++++++++ wal_test.go | 92 +++++++++++++++++++++++++++++++++++++ 8 files changed, 438 insertions(+), 1 deletion(-) create mode 100644 db.go create mode 100644 litestream.go create mode 100644 wal.go create mode 100644 wal_test.go diff --git a/db.go b/db.go new file mode 100644 index 0000000..2325583 --- /dev/null +++ b/db.go @@ -0,0 +1,84 @@ +package litestream + +import ( + "context" + "path/filepath" + "sync" +) + +const ( + MetaDirSuffix = "-litestream" + + WALDirName = "wal" + LogFilename = "log" +) + +// Mode represents the journaling mode of a DB. +type Mode int + +const ( + ModeEmpty = Mode(iota + 1) + ModeJournal + ModeWAL +) + +// DB represents an instance of a managed SQLite database in the file system. +type DB struct { + path string + mode Mode // method of writing to DB + inTx bool // currently in transaction + + walFile *WALFile // active wal segment + + ctx context.Context + cancel func() + wg sync.WaitGroup +} + +// NewDB returns a new instance of DB for a given path. +func NewDB(path string) *DB { + db := &DB{path: path} + db.ctx, db.cancel = context.WithCancel(context.Background()) + return db +} + +// Path returns the path to the database. +func (db *DB) Path() string { + return db.path +} + +// MetaPath returns the path to the database metadata. +func (db *DB) MetaPath() string { + dir, file := filepath.Split(db.path) + return filepath.Join(dir, "."+file+MetaDirSuffix) +} + +// WALPath returns the path to the internal WAL directory. +func (db *DB) WALPath() string { + return filepath.Join(db.MetaPath(), WALDirName) +} + +// LogPath returns the path to the internal log directory. +func (db *DB) LogPath() string { + return filepath.Join(db.MetaPath(), LogFilename) +} + +// Open loads the configuration file +func (db *DB) Open() error { + // TODO: Ensure sidecar directory structure exists. + // TODO: Read WAL segments. + return nil +} + +// Close stops management of the database. +func (db *DB) Close() error { + db.cancel() + db.wg.Wait() + // TODO: Close WAL segments. + return nil +} + +// ActiveWALFile returns the active WAL file. +func (db *DB) ActiveWALFile() *WALFile { + return db.walFile +} diff --git a/file_system.go b/file_system.go index c0f5edd..8f41f8e 100644 --- a/file_system.go +++ b/file_system.go @@ -1,7 +1,17 @@ package litestream import ( + "fmt" + "io/ioutil" + "path/filepath" + "sync" + "bazil.org/fuse/fs" + "github.com/pelletier/go-toml" +) + +const ( + ConfigName = "litestream.config" ) var _ fs.FS = (*FileSystem)(nil) @@ -9,10 +19,112 @@ var _ fs.FS = (*FileSystem)(nil) // FileSystem represents the file system that is mounted. // It returns a root node that represents the root directory. type FileSystem struct { + mu sync.RWMutex + dbs map[string]*DB // databases by path + config Config // configuration file + + // Filepath to the root of the source directory. SourcePath string } +func NewFileSystem() *FileSystem { + return &FileSystem{ + config: DefaultConfig(), + } +} + +// ConfigPath returns the path to the file system config file. +func (f *FileSystem) ConfigPath() string { + return filepath.Join(f.SourcePath, ConfigName) +} + +// Open initializes the file system and finds all managed database files. +func (f *FileSystem) Open() error { + f.mu.Lock() + defer f.mu.Lock() + return f.load() +} + +// load loads the configuration file. +func (f *FileSystem) load() error { + // Read configuration file. + config := DefaultConfig() + if buf, err := ioutil.ReadFile(f.ConfigPath()); err != nil { + return err + } else if err := toml.Unmarshal(buf, &config); err != nil { + return fmt.Errorf("unmarshal(): cannot read config file: %w", err) + } + f.config = config + + // Close dbs opened under previous configuration. + if err := f.closeDBs(); err != nil { + return fmt.Errorf("load(): cannot close db: %w", err) + } + + // Search for matching DB files. + filenames, err := filepath.Glob(config.Pattern) + if err != nil { + return fmt.Errorf("load(): cannot glob: %w", err) + } + + // Loop over matching files and create a DB for them. + for _, filename := range filenames { + db := NewDB(filename) + if err := db.Open(); err != nil { + return err + } + f.dbs[db.Path()] = db + } + + return nil +} + +// Close closes the file system and flushes all managed database files. +func (f *FileSystem) Close() (err error) { + f.mu.Lock() + defer f.mu.Unlock() + return f.closeDBs() +} + +func (f *FileSystem) closeDBs() (err error) { + for key, db := range f.dbs { + if e := db.Close(); e != nil && err == nil { + err = e + } + delete(f.dbs, key) + } + return err +} + // Root returns the file system root node. func (f *FileSystem) Root() (fs.Node, error) { return &Node{fs: f}, nil } + +// Config represents the configuration file for the file system. +type Config struct { + Pattern string `toml:"pattern"` // glob pattern + ReadOnly bool `toml:"read-only"` // if true, expose only read access via FUSE + RecoverFrom string `toml:"recover-from"` // http URL, S3, etc. + + HTTP HTTPConfig `toml:"http"` + S3 S3Config `toml:"s3"` +} + +// DefaultConfig returns the default configuration. +func DefaultConfig() Config { + return Config{} +} + +// S3Config represents the configuration for replicating to/from an S3-compatible store. +type S3Config struct { + AccessKeyID string `toml:"access-key-id"` // AWS access key + SecretAccessKey string `toml:"secret-access-key"` // AWS secret access key +} + +// HTTPConfig represents the configuration for exposing data via HTTP. +type HTTPConfig struct { + Addr string `toml:"addr"` // bind address + CertFile string `toml:"cert-file"` // TLS certificate path + KeyFile string `toml:"key-file"` // TLS key path +} diff --git a/go.mod b/go.mod index 1a930d7..bbc03af 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,8 @@ -module github.com/middlemost/litestream +module github.com/benbjohnson/litestream go 1.14 require ( bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 + github.com/pelletier/go-toml v1.8.1 ) diff --git a/go.sum b/go.sum index a32e27b..b6fcfe4 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,11 @@ bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 h1:UrYe9YkT4Wpm6D+zByEyCJQzDqT bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05/go.mod h1:h0h5FBYpXThbvSfTqthw+0I4nmHnhTHkO5BoOHsBWqg= github.com/Julusian/godocdown v0.0.0-20170816220326-6d19f8ff2df8/go.mod h1:INZr5t32rG59/5xeltqoCJoNY7e5x/3xoY9WSWVWg74= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dvyukov/go-fuzz v0.0.0-20200318091601-be3528f3a813/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/pelletier/go-toml v1.8.1 h1:1Nf83orprkJyknT6h7zbuEGUEjcyVlCxSUGTENmNCRM= +github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= github.com/stephens2424/writerset v1.0.2/go.mod h1:aS2JhsMn6eA7e82oNmW4rfsgAOp9COBTTl8mzkwADnc= diff --git a/handle.go b/handle.go index 6cf684c..6718ba8 100644 --- a/handle.go +++ b/handle.go @@ -2,7 +2,9 @@ package litestream import ( "context" + "encoding/hex" "io" + "log" "os" "sort" "syscall" @@ -51,6 +53,9 @@ func (h *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Rea // Write writes data at a given offset to the underlying file. func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) { + log.Printf("write: name=%s offset=%d", h.f.Name(), req.Offset) + println(hex.Dump(req.Data)) + resp.Size, err = h.f.WriteAt(req.Data, req.Offset) return err } diff --git a/litestream.go b/litestream.go new file mode 100644 index 0000000..6daef29 --- /dev/null +++ b/litestream.go @@ -0,0 +1,19 @@ +package litestream + +import ( + "io" +) + +const ( + WriteVersionOffset = 18 + ReadVersionOffset = 19 +) + +// ReadVersion returns the SQLite write & read version. +// Returns 1 for legacy & 2 for WAL. +func ReadVersion(b []byte) (writeVersion, readVersion uint8, err error) { + if len(b) < ReadVersionOffset { + return 0, 0, io.ErrUnexpectedEOF + } + return b[WriteVersionOffset], b[ReadVersionOffset], nil +} diff --git a/wal.go b/wal.go new file mode 100644 index 0000000..5814a55 --- /dev/null +++ b/wal.go @@ -0,0 +1,121 @@ +package litestream + +import ( + "encoding/binary" + "io" + "os" +) + +// WALFile represents a write-ahead log file. +type WALFile struct { + path string + hdr WALHeader + + f *os.File +} + +// NewWALFile returns a new instance of WALFile. +func NewWALFile(path string) *WALFile { + return &WALFile{path: path} +} + +func (s *WALFile) Open() error { + panic("TODO") +} + +func (s *WALFile) Close() error { + panic("TODO") +} + +func (s *WALFile) WriteHeader(hdr WALHeader) error { + s.hdr = hdr + panic("TODO") +} + +func (s *WALFile) WriteFrame(hdr WALFrameHeader, buf []byte) error { + panic("TODO") +} + +// WALHeaderSize is the size of the WAL header, in bytes. +const WALHeaderSize = 32 + +type WALHeader struct { + Magic uint32 + FileFormatVersion uint32 + PageSize uint32 + CheckpointSeqNo uint32 + Salt uint64 + Checksum uint64 +} + +// MarshalTo encodes the header to b. +// Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. +func (hdr *WALHeader) MarshalTo(b []byte) error { + if len(b) < WALHeaderSize { + return io.ErrShortWrite + } + binary.BigEndian.PutUint32(b[0:], hdr.Magic) + binary.BigEndian.PutUint32(b[4:], hdr.FileFormatVersion) + binary.BigEndian.PutUint32(b[8:], hdr.PageSize) + binary.BigEndian.PutUint32(b[12:], hdr.CheckpointSeqNo) + binary.BigEndian.PutUint64(b[16:], hdr.Salt) + binary.BigEndian.PutUint64(b[24:], hdr.Checksum) + return nil +} + +// Unmarshal decodes the header from b. +// Returns io.ErrUnexpectedEOF if len(b) is less than WALHeaderSize. +func (hdr *WALHeader) Unmarshal(b []byte) error { + if len(b) < WALHeaderSize { + return io.ErrUnexpectedEOF + } + hdr.Magic = binary.BigEndian.Uint32(b[0:]) + hdr.FileFormatVersion = binary.BigEndian.Uint32(b[4:]) + hdr.PageSize = binary.BigEndian.Uint32(b[8:]) + hdr.CheckpointSeqNo = binary.BigEndian.Uint32(b[12:]) + hdr.Salt = binary.BigEndian.Uint64(b[16:]) + hdr.Checksum = binary.BigEndian.Uint64(b[24:]) + return nil +} + +// WALFrameHeaderSize is the size of the WAL frame header, in bytes. +const WALFrameHeaderSize = 24 + +// WALFrameHeader represents a SQLite WAL frame header. +type WALFrameHeader struct { + Pgno uint32 + PageN uint32 // only set for commit records + Salt uint64 + Checksum uint64 +} + +// IsCommit returns true if the frame represents a commit header. +func (hdr *WALFrameHeader) IsCommit() bool { + return hdr.PageN != 0 +} + +// MarshalTo encodes the frame header to b. +// Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. +func (hdr *WALFrameHeader) MarshalTo(b []byte) error { + if len(b) < WALFrameHeaderSize { + return io.ErrShortWrite + } + binary.BigEndian.PutUint32(b[0:], hdr.Pgno) + binary.BigEndian.PutUint32(b[4:], hdr.PageN) + binary.BigEndian.PutUint64(b[8:], hdr.Salt) + binary.BigEndian.PutUint64(b[16:], hdr.Checksum) + return nil +} + +// Unmarshal decodes the frame header from b. +// Returns io.ErrUnexpectedEOF if len(b) is less than WALHeaderSize. +func (hdr *WALFrameHeader) Unmarshal(b []byte) error { + if len(b) < WALFrameHeaderSize { + return io.ErrUnexpectedEOF + } + hdr.Pgno = binary.BigEndian.Uint32(b[0:]) + hdr.PageN = binary.BigEndian.Uint32(b[4:]) + hdr.Salt = binary.BigEndian.Uint64(b[8:]) + hdr.Checksum = binary.BigEndian.Uint64(b[16:]) + return nil +} diff --git a/wal_test.go b/wal_test.go new file mode 100644 index 0000000..60a52e7 --- /dev/null +++ b/wal_test.go @@ -0,0 +1,92 @@ +package litestream_test + +import ( + "io" + "testing" + + "github.com/benbjohnson/litestream" +) + +func TestWALHeader_MarshalTo(t *testing.T) { + // Ensure the WAL header can be marshaled and unmarshaled correctly. + t.Run("OK", func(t *testing.T) { + hdr := litestream.WALHeader{ + Magic: 1000, + FileFormatVersion: 1001, + PageSize: 1002, + CheckpointSeqNo: 1003, + Salt: 1004, + Checksum: 1005, + } + b := make([]byte, litestream.WALHeaderSize) + if err := hdr.MarshalTo(b); err != nil { + t.Fatal(err) + } + + var other litestream.WALHeader + if err := other.Unmarshal(b); err != nil { + t.Fatal(err) + } else if got, want := hdr, other; got != want { + t.Fatalf("mismatch: got %#v, want %#v", got, want) + } + }) + + // Ensure that marshaling to a small byte slice returns an error. + t.Run("ErrShortWrite", func(t *testing.T) { + var hdr litestream.WALHeader + if err := hdr.MarshalTo(make([]byte, litestream.WALHeaderSize-1)); err != io.ErrShortWrite { + t.Fatalf("unexpected error: %#v", err) + } + }) +} + +func TestWALHeader_Unmarshal(t *testing.T) { + // Ensure that unmarshaling from a small byte slice returns an error. + t.Run("ErrUnexpectedEOF", func(t *testing.T) { + var hdr litestream.WALHeader + if err := hdr.Unmarshal(make([]byte, litestream.WALHeaderSize-1)); err != io.ErrUnexpectedEOF { + t.Fatalf("unexpected error: %#v", err) + } + }) +} + +func TestWALFrameHeader_MarshalTo(t *testing.T) { + // Ensure the WAL header can be marshaled and unmarshaled correctly. + t.Run("OK", func(t *testing.T) { + hdr := litestream.WALFrameHeader{ + Pgno: 1000, + PageN: 1001, + Salt: 1002, + Checksum: 1003, + } + b := make([]byte, litestream.WALFrameHeaderSize) + if err := hdr.MarshalTo(b); err != nil { + t.Fatal(err) + } + + var other litestream.WALFrameHeader + if err := other.Unmarshal(b); err != nil { + t.Fatal(err) + } else if got, want := hdr, other; got != want { + t.Fatalf("mismatch: got %#v, want %#v", got, want) + } + }) + + // Ensure that marshaling to a small byte slice returns an error. + t.Run("ErrShortWrite", func(t *testing.T) { + var hdr litestream.WALFrameHeader + if err := hdr.MarshalTo(make([]byte, litestream.WALFrameHeaderSize-1)); err != io.ErrShortWrite { + t.Fatalf("unexpected error: %#v", err) + } + }) +} + +func TestWALFrameHeader_Unmarshal(t *testing.T) { + // Ensure that unmarshaling from a small byte slice returns an error. + t.Run("ErrUnexpectedEOF", func(t *testing.T) { + var hdr litestream.WALFrameHeader + if err := hdr.Unmarshal(make([]byte, litestream.WALFrameHeaderSize-1)); err != io.ErrUnexpectedEOF { + t.Fatalf("unexpected error: %#v", err) + } + }) +}