Load dbs on startup

This commit is contained in:
Ben Johnson
2020-11-03 14:50:16 -07:00
parent bcc6963db6
commit b1ec5c721b
6 changed files with 77 additions and 154 deletions

View File

@@ -87,8 +87,14 @@ func (m *Main) Run(args []string) (err error) {
m.logger.Printf("mounted %s; target=%s", m.Path, m.TargetPath) m.logger.Printf("mounted %s; target=%s", m.Path, m.TargetPath)
fileSystem := litestream.NewFileSystem(m.TargetPath)
if err := fileSystem.Open(); err != nil {
return err
}
defer fileSystem.Close()
s := fs.New(conn, &config) s := fs.New(conn, &config)
return s.Serve(&litestream.FileSystem{TargetPath: m.TargetPath}) return s.Serve(fileSystem)
} }
func (m *Main) ensureTargetPath() error { func (m *Main) ensureTargetPath() error {

32
db.go
View File

@@ -3,33 +3,23 @@ package litestream
import ( import (
"context" "context"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
) )
const ( const (
MetaDirSuffix = "-litestream" MetaDirSuffix = "-litestream"
ConfigSuffix = ".litestream"
WALDirName = "wal" WALDirName = "wal"
LogFilename = "log" LogFilename = "log"
) )
// Mode represents the journaling mode of a DB.
type Mode int
const (
ModeEmpty = Mode(iota + 1)
ModeJournal
ModeWAL
)
// DB represents an instance of a managed SQLite database in the file system. // DB represents an instance of a managed SQLite database in the file system.
type DB struct { type DB struct {
path string path string
mode Mode // method of writing to DB
inTx bool // currently in transaction inTx bool // currently in transaction
walFile *WALFile // active wal segment
ctx context.Context ctx context.Context
cancel func() cancel func()
wg sync.WaitGroup wg sync.WaitGroup
@@ -66,7 +56,6 @@ func (db *DB) LogPath() string {
// Open loads the configuration file // Open loads the configuration file
func (db *DB) Open() error { func (db *DB) Open() error {
// TODO: Ensure sidecar directory structure exists. // TODO: Ensure sidecar directory structure exists.
// TODO: Read WAL segments.
return nil return nil
} }
@@ -74,11 +63,20 @@ func (db *DB) Open() error {
func (db *DB) Close() error { func (db *DB) Close() error {
db.cancel() db.cancel()
db.wg.Wait() db.wg.Wait()
// TODO: Close WAL segments.
return nil return nil
} }
// ActiveWALFile returns the active WAL file. // IsMetaDir returns true if base in path is hidden and ends in "-litestream".
func (db *DB) ActiveWALFile() *WALFile { func IsMetaDir(path string) bool {
return db.walFile base := filepath.Base(path)
return strings.HasPrefix(base, ".") && strings.HasSuffix(base, MetaDirSuffix)
}
func IsConfigPath(path string) bool {
return strings.HasSuffix(path, ConfigSuffix)
}
// ConfigPathToDBPath returns the path to the database based on a config path.
func ConfigPathToDBPath(path string) string {
return strings.TrimSuffix(path, ConfigSuffix)
} }

View File

@@ -13,8 +13,8 @@ to construct a persistent write-ahead log that can be replicated.
dir/ dir/
db # SQLite database db # SQLite database
db-wal # SQLite WAL db-wal # SQLite WAL
.db-lightstream.config # per-db configuration db.litestream # per-db configuration
.db-lightstream/ .db-litestream/
log # recent event log log # recent event log
stat # per-db Prometheus statistics stat # per-db Prometheus statistics
snapshot # stores snapshot number (e.g. 0000000000000001) snapshot # stores snapshot number (e.g. 0000000000000001)
@@ -46,7 +46,7 @@ bkt/
File system startup: File system startup:
1. Load litestream.config file. 1. Load litestream.config file.
2. Load all per-db "-lightstream.config" files. 2. Load all per-db ".litestream" files.
### DB startup: ### DB startup:

View File

@@ -1,17 +1,13 @@
package litestream package litestream
import ( import (
"fmt" "log"
"io/ioutil" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"bazil.org/fuse/fs" "bazil.org/fuse/fs"
"github.com/pelletier/go-toml" // "github.com/pelletier/go-toml"
)
const (
ConfigName = "litestream.config"
) )
var _ fs.FS = (*FileSystem)(nil) var _ fs.FS = (*FileSystem)(nil)
@@ -21,61 +17,71 @@ var _ fs.FS = (*FileSystem)(nil)
type FileSystem struct { type FileSystem struct {
mu sync.RWMutex mu sync.RWMutex
dbs map[string]*DB // databases by path dbs map[string]*DB // databases by path
config Config // configuration file
// Filepath to the root of the source directory. // Filepath to the root of the source directory.
TargetPath string TargetPath string
} }
func NewFileSystem() *FileSystem { func NewFileSystem(target string) *FileSystem {
return &FileSystem{ return &FileSystem{
config: DefaultConfig(), dbs: make(map[string]*DB),
TargetPath: target,
} }
} }
// ConfigPath returns the path to the file system config file.
func (f *FileSystem) ConfigPath() string {
return filepath.Join(f.TargetPath, ConfigName)
}
// Open initializes the file system and finds all managed database files. // Open initializes the file system and finds all managed database files.
func (f *FileSystem) Open() error { func (f *FileSystem) Open() error {
f.mu.Lock() f.mu.Lock()
defer f.mu.Lock() defer f.mu.Unlock()
return f.load()
return filepath.Walk(f.TargetPath, func(path string, info os.FileInfo, err error) error {
// Log errors while traversing file system.
if err != nil {
log.Printf("walk error: %s", err)
return nil
}
// Ignore .<db>-litestream metadata directories.
if IsMetaDir(path) {
return filepath.SkipDir
} else if !IsConfigPath(path) {
return nil
}
// Determine the DB path relative to the target path.
rel, err := filepath.Rel(f.TargetPath, ConfigPathToDBPath(path))
if err != nil {
return err
}
// Initialize a DB object based on the config path.
// The database doesn't need to exist. It will be tracked when created.
db := NewDB(rel)
if err := db.Open(); err != nil {
log.Printf("cannot open db %q: %s", rel, err)
return nil
}
f.dbs[db.Path()] = db
log.Printf("[DB]: %s", rel)
return nil
})
} }
// load loads the configuration file. // OpenDB initializes a DB for a given path.
func (f *FileSystem) load() error { func (f *FileSystem) OpenDB(path string) error {
// Read configuration file. f.mu.Lock()
config := DefaultConfig() defer f.mu.Unlock()
if buf, err := ioutil.ReadFile(f.ConfigPath()); err != nil { return f.openDB(path)
return err }
} else if err := toml.Unmarshal(buf, &config); err != nil {
return fmt.Errorf("unmarshal(): cannot read config file: %w", err)
}
f.config = config
// Close dbs opened under previous configuration. func (f *FileSystem) openDB(path string) error {
if err := f.closeDBs(); err != nil { db := NewDB(path)
return fmt.Errorf("load(): cannot close db: %w", err)
}
// Search for matching DB files.
filenames, err := filepath.Glob(config.Pattern)
if err != nil {
return fmt.Errorf("load(): cannot glob: %w", err)
}
// Loop over matching files and create a DB for them.
for _, filename := range filenames {
db := NewDB(filename)
if err := db.Open(); err != nil { if err := db.Open(); err != nil {
return err return err
} }
f.dbs[db.Path()] = db f.dbs[db.Path()] = db
}
return nil return nil
} }

69
wal.go
View File

@@ -3,9 +3,7 @@ package litestream
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"io" "io"
"os"
) )
// TODO: Pages can be written multiple times before 3.11.0 (https://sqlite.org/releaselog/3_11_0.html) // TODO: Pages can be written multiple times before 3.11.0 (https://sqlite.org/releaselog/3_11_0.html)
@@ -22,73 +20,6 @@ var (
ErrChecksumMisaligned = errors.New("checksum input misaligned") ErrChecksumMisaligned = errors.New("checksum input misaligned")
) )
// WALFile represents a write-ahead log file.
type WALFile struct {
path string
hdr WALHeader
f *os.File
}
// NewWALFile returns a new instance of WALFile.
func NewWALFile(path string) *WALFile {
return &WALFile{path: path}
}
// WALHeader returns the WAL header. The return is the zero value if unset.
func (s *WALFile) Header() WALHeader {
return s.hdr
}
// Open initializes the WAL file descriptor. Creates the file if it doesn't exist.
func (s *WALFile) Open() (err error) {
// TODO: Validate file contents if non-zero. Return ErrWALFileInvalidHeader if header invalid.
// TODO: Truncate transaction if commit record is invalid.
if s.f, err = os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0666); err != nil {
return err
}
return nil
}
// Close syncs the WAL file and closes the file descriptor.
func (s *WALFile) Close() error {
if err := s.f.Sync(); err != nil {
return fmt.Errorf("wal sync: %w", err)
}
return s.f.Close()
}
// Sync calls Sync() on the underlying file descriptor.
func (s *WALFile) Sync() error {
return s.f.Sync()
}
// WriteHeader writes hdr to the WAL file.
// Returns an error if hdr is empty or if the file already has a header.
func (s *WALFile) WriteHeader(hdr WALHeader) error {
if hdr.IsZero() {
return ErrWALHeaderEmpty
} else if !s.hdr.IsZero() {
return ErrWALFileInitialized
}
s.hdr = hdr
// Marshal header & write to file.
b := make([]byte, WALHeaderSize)
if err := s.hdr.MarshalTo(b); err != nil {
return fmt.Errorf("marshal wal header: %w", err)
} else if _, err := s.f.Write(b); err != nil {
return err
}
return nil
}
func (s *WALFile) WriteFrame(hdr WALFrameHeader, buf []byte) error {
panic("TODO")
}
// WALHeaderSize is the size of the WAL header, in bytes. // WALHeaderSize is the size of the WAL header, in bytes.
const WALHeaderSize = 32 const WALHeaderSize = 32

View File

@@ -8,24 +8,6 @@ import (
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
) )
func TestWALFile_WriteHeader(t *testing.T) {
t.Run("OK", func(t *testing.T) {
f := MustOpenWALFile(t, "0000")
defer MustCloseWALFile(t, f)
if err := f.WriteHeader(litestream.WALHeader{
Magic: litestream.MagicLittleEndian,
FileFormatVersion: 1001,
PageSize: 4096,
CheckpointSeqNo: 1003,
}); err != nil {
t.Fatal(err)
}
t.Fatal("TODO: Ensure header written correctly")
})
}
func TestWALHeader_MarshalTo(t *testing.T) { func TestWALHeader_MarshalTo(t *testing.T) {
// Ensure the WAL header can be marshaled and unmarshaled correctly. // Ensure the WAL header can be marshaled and unmarshaled correctly.
t.Run("OK", func(t *testing.T) { t.Run("OK", func(t *testing.T) {