From b1ec5c721b01bc13c16703302b2dc6563812db24 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 3 Nov 2020 14:50:16 -0700 Subject: [PATCH] Load dbs on startup --- cmd/litestream/main.go | 8 +++- db.go | 32 +++++++------- doc/DESIGN.md | 6 +-- file_system.go | 98 ++++++++++++++++++++++-------------------- wal.go | 69 ----------------------------- wal_test.go | 18 -------- 6 files changed, 77 insertions(+), 154 deletions(-) diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 3824c58..799088a 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -87,8 +87,14 @@ func (m *Main) Run(args []string) (err error) { m.logger.Printf("mounted %s; target=%s", m.Path, m.TargetPath) + fileSystem := litestream.NewFileSystem(m.TargetPath) + if err := fileSystem.Open(); err != nil { + return err + } + defer fileSystem.Close() + s := fs.New(conn, &config) - return s.Serve(&litestream.FileSystem{TargetPath: m.TargetPath}) + return s.Serve(fileSystem) } func (m *Main) ensureTargetPath() error { diff --git a/db.go b/db.go index 2325583..7ac93cf 100644 --- a/db.go +++ b/db.go @@ -3,33 +3,23 @@ package litestream import ( "context" "path/filepath" + "strings" "sync" ) const ( MetaDirSuffix = "-litestream" + ConfigSuffix = ".litestream" WALDirName = "wal" LogFilename = "log" ) -// Mode represents the journaling mode of a DB. -type Mode int - -const ( - ModeEmpty = Mode(iota + 1) - ModeJournal - ModeWAL -) - // DB represents an instance of a managed SQLite database in the file system. type DB struct { path string - mode Mode // method of writing to DB inTx bool // currently in transaction - walFile *WALFile // active wal segment - ctx context.Context cancel func() wg sync.WaitGroup @@ -66,7 +56,6 @@ func (db *DB) LogPath() string { // Open loads the configuration file func (db *DB) Open() error { // TODO: Ensure sidecar directory structure exists. - // TODO: Read WAL segments. return nil } @@ -74,11 +63,20 @@ func (db *DB) Open() error { func (db *DB) Close() error { db.cancel() db.wg.Wait() - // TODO: Close WAL segments. return nil } -// ActiveWALFile returns the active WAL file. -func (db *DB) ActiveWALFile() *WALFile { - return db.walFile +// IsMetaDir returns true if base in path is hidden and ends in "-litestream". +func IsMetaDir(path string) bool { + base := filepath.Base(path) + return strings.HasPrefix(base, ".") && strings.HasSuffix(base, MetaDirSuffix) +} + +func IsConfigPath(path string) bool { + return strings.HasSuffix(path, ConfigSuffix) +} + +// ConfigPathToDBPath returns the path to the database based on a config path. +func ConfigPathToDBPath(path string) string { + return strings.TrimSuffix(path, ConfigSuffix) } diff --git a/doc/DESIGN.md b/doc/DESIGN.md index a17bd6c..e8ab835 100644 --- a/doc/DESIGN.md +++ b/doc/DESIGN.md @@ -13,8 +13,8 @@ to construct a persistent write-ahead log that can be replicated. dir/ db # SQLite database db-wal # SQLite WAL - .db-lightstream.config # per-db configuration - .db-lightstream/ + db.litestream # per-db configuration + .db-litestream/ log # recent event log stat # per-db Prometheus statistics snapshot # stores snapshot number (e.g. 0000000000000001) @@ -46,7 +46,7 @@ bkt/ File system startup: 1. Load litestream.config file. -2. Load all per-db "-lightstream.config" files. +2. Load all per-db ".litestream" files. ### DB startup: diff --git a/file_system.go b/file_system.go index 8dee7d9..7882d1a 100644 --- a/file_system.go +++ b/file_system.go @@ -1,17 +1,13 @@ package litestream import ( - "fmt" - "io/ioutil" + "log" + "os" "path/filepath" "sync" "bazil.org/fuse/fs" - "github.com/pelletier/go-toml" -) - -const ( - ConfigName = "litestream.config" + // "github.com/pelletier/go-toml" ) var _ fs.FS = (*FileSystem)(nil) @@ -19,63 +15,73 @@ var _ fs.FS = (*FileSystem)(nil) // FileSystem represents the file system that is mounted. // It returns a root node that represents the root directory. type FileSystem struct { - mu sync.RWMutex - dbs map[string]*DB // databases by path - config Config // configuration file + mu sync.RWMutex + dbs map[string]*DB // databases by path // Filepath to the root of the source directory. TargetPath string } -func NewFileSystem() *FileSystem { +func NewFileSystem(target string) *FileSystem { return &FileSystem{ - config: DefaultConfig(), + dbs: make(map[string]*DB), + TargetPath: target, } } -// ConfigPath returns the path to the file system config file. -func (f *FileSystem) ConfigPath() string { - return filepath.Join(f.TargetPath, ConfigName) -} - // Open initializes the file system and finds all managed database files. func (f *FileSystem) Open() error { f.mu.Lock() - defer f.mu.Lock() - return f.load() -} + defer f.mu.Unlock() -// load loads the configuration file. -func (f *FileSystem) load() error { - // Read configuration file. - config := DefaultConfig() - if buf, err := ioutil.ReadFile(f.ConfigPath()); err != nil { - return err - } else if err := toml.Unmarshal(buf, &config); err != nil { - return fmt.Errorf("unmarshal(): cannot read config file: %w", err) - } - f.config = config + return filepath.Walk(f.TargetPath, func(path string, info os.FileInfo, err error) error { + // Log errors while traversing file system. + if err != nil { + log.Printf("walk error: %s", err) + return nil + } - // Close dbs opened under previous configuration. - if err := f.closeDBs(); err != nil { - return fmt.Errorf("load(): cannot close db: %w", err) - } + // Ignore .-litestream metadata directories. + if IsMetaDir(path) { + return filepath.SkipDir + } else if !IsConfigPath(path) { + return nil + } - // Search for matching DB files. - filenames, err := filepath.Glob(config.Pattern) - if err != nil { - return fmt.Errorf("load(): cannot glob: %w", err) - } - - // Loop over matching files and create a DB for them. - for _, filename := range filenames { - db := NewDB(filename) - if err := db.Open(); err != nil { + // Determine the DB path relative to the target path. + rel, err := filepath.Rel(f.TargetPath, ConfigPathToDBPath(path)) + if err != nil { return err } - f.dbs[db.Path()] = db - } + // Initialize a DB object based on the config path. + // The database doesn't need to exist. It will be tracked when created. + db := NewDB(rel) + if err := db.Open(); err != nil { + log.Printf("cannot open db %q: %s", rel, err) + return nil + } + f.dbs[db.Path()] = db + + log.Printf("[DB]: %s", rel) + + return nil + }) +} + +// OpenDB initializes a DB for a given path. +func (f *FileSystem) OpenDB(path string) error { + f.mu.Lock() + defer f.mu.Unlock() + return f.openDB(path) +} + +func (f *FileSystem) openDB(path string) error { + db := NewDB(path) + if err := db.Open(); err != nil { + return err + } + f.dbs[db.Path()] = db return nil } diff --git a/wal.go b/wal.go index f9cc10b..b24d367 100644 --- a/wal.go +++ b/wal.go @@ -3,9 +3,7 @@ package litestream import ( "encoding/binary" "errors" - "fmt" "io" - "os" ) // TODO: Pages can be written multiple times before 3.11.0 (https://sqlite.org/releaselog/3_11_0.html) @@ -22,73 +20,6 @@ var ( ErrChecksumMisaligned = errors.New("checksum input misaligned") ) -// WALFile represents a write-ahead log file. -type WALFile struct { - path string - hdr WALHeader - - f *os.File -} - -// NewWALFile returns a new instance of WALFile. -func NewWALFile(path string) *WALFile { - return &WALFile{path: path} -} - -// WALHeader returns the WAL header. The return is the zero value if unset. -func (s *WALFile) Header() WALHeader { - return s.hdr -} - -// Open initializes the WAL file descriptor. Creates the file if it doesn't exist. -func (s *WALFile) Open() (err error) { - // TODO: Validate file contents if non-zero. Return ErrWALFileInvalidHeader if header invalid. - // TODO: Truncate transaction if commit record is invalid. - - if s.f, err = os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0666); err != nil { - return err - } - return nil -} - -// Close syncs the WAL file and closes the file descriptor. -func (s *WALFile) Close() error { - if err := s.f.Sync(); err != nil { - return fmt.Errorf("wal sync: %w", err) - } - return s.f.Close() -} - -// Sync calls Sync() on the underlying file descriptor. -func (s *WALFile) Sync() error { - return s.f.Sync() -} - -// WriteHeader writes hdr to the WAL file. -// Returns an error if hdr is empty or if the file already has a header. -func (s *WALFile) WriteHeader(hdr WALHeader) error { - if hdr.IsZero() { - return ErrWALHeaderEmpty - } else if !s.hdr.IsZero() { - return ErrWALFileInitialized - } - s.hdr = hdr - - // Marshal header & write to file. - b := make([]byte, WALHeaderSize) - if err := s.hdr.MarshalTo(b); err != nil { - return fmt.Errorf("marshal wal header: %w", err) - } else if _, err := s.f.Write(b); err != nil { - return err - } - - return nil -} - -func (s *WALFile) WriteFrame(hdr WALFrameHeader, buf []byte) error { - panic("TODO") -} - // WALHeaderSize is the size of the WAL header, in bytes. const WALHeaderSize = 32 diff --git a/wal_test.go b/wal_test.go index 0038cda..af491de 100644 --- a/wal_test.go +++ b/wal_test.go @@ -8,24 +8,6 @@ import ( "github.com/benbjohnson/litestream" ) -func TestWALFile_WriteHeader(t *testing.T) { - t.Run("OK", func(t *testing.T) { - f := MustOpenWALFile(t, "0000") - defer MustCloseWALFile(t, f) - - if err := f.WriteHeader(litestream.WALHeader{ - Magic: litestream.MagicLittleEndian, - FileFormatVersion: 1001, - PageSize: 4096, - CheckpointSeqNo: 1003, - }); err != nil { - t.Fatal(err) - } - - t.Fatal("TODO: Ensure header written correctly") - }) -} - func TestWALHeader_MarshalTo(t *testing.T) { // Ensure the WAL header can be marshaled and unmarshaled correctly. t.Run("OK", func(t *testing.T) {