diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 66fd13c..0000000 --- a/.gitignore +++ /dev/null @@ -1,15 +0,0 @@ -# Binaries for programs and plugins -*.exe -*.exe~ -*.dll -*.so -*.dylib - -# Test binary, built with `go test -c` -*.test - -# Output of the go coverage tool, specifically when used with LiteIDE -*.out - -# Dependency directories (remove the comment below to include it) -# vendor/ diff --git a/README.md b/README.md index 4e4fd8f..56db82e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ -# litestream +litestream +========== Streaming replication for SQLite. diff --git a/cmd/litestream/config.go b/cmd/litestream/config.go new file mode 100644 index 0000000..e21960c --- /dev/null +++ b/cmd/litestream/config.go @@ -0,0 +1,49 @@ +package main + +import ( + "io/ioutil" + "os" + "strings" + + "github.com/pelletier/go-toml" +) + +// Config represents a configuration file for the litestream CLI. +type Config struct { + DBs []DBConfig `toml:"db"` +} + +// DefaultConfig returns a new instance of Config with defaults set. +func DefaultConfig() Config { + return Config{} +} + +// ReadConfigFile unmarshals config from filename. Expands path if needed. +func ReadConfigFile(filename string) (Config, error) { + config := DefaultConfig() + + // Expand filename, if necessary. + if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(filename, prefix) { + u, err := user.Current() + if err != nil { + return config, err + } else if u.HomeDir == "" { + return config, fmt.Errorf("home directory unset") + } + filename = filepath.Join(u.HomeDir, strings.TrimPrefix(filename, prefix)) + } + + // Read & deserialize configuration. + if buf, err := ioutil.ReadFile(filename); os.IsNotExist(err) { + return config, fmt.Errorf("config file not found: %s", filename) + } else if err != nil { + return config, err + } else if err := toml.Unmarshal(buf, &config); err != nil { + return config, err + } + return config, nil +} + +type DBConfig struct { + Path string `toml:"path"` +} diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index f2a7a98..5bace17 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "errors" "flag" @@ -8,137 +9,116 @@ import ( "io/ioutil" "log" "os" + "os/signal" "path/filepath" - "bazil.org/fuse" - "bazil.org/fuse/fs" "github.com/benbjohnson/litestream" ) func main() { + // Setup signal handler. + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { <-c; cancel() }() + + // Initialize program and read flags/config. m := NewMain() - if err := m.Run(os.Args[1:]); err == flag.ErrHelp { + if err := m.ParseFlags(os.Args[1:]); err == flag.ErrHelp { os.Exit(1) } else if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } + + // Start monitoring databases. + if err := m.Run(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + // Wait for signal to stop program. + <-ctx.Done() + signal.Reset() + + // Gracefully close + if err := m.Close(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } } type Main struct { - logger *log.Logger - - TargetPath string - Path string + ConfigPath string + Config Config } func NewMain() *Main { - return &Main{ - logger: log.New(ioutil.Discard, "", log.LstdFlags), - } + return &Main{} } -func (m *Main) Run(args []string) (err error) { - flagSet := flag.NewFlagSet("litestream", flag.ContinueOnError) - flagSet.StringVar(&m.TargetPath, "target", "", "target directory") - verbose := flagSet.Bool("v", false, "verbose") - flagSet.Usage = m.usage - if err := flagSet.Parse(args); err != nil { +// ParseFlags parses the flag set from args & loads the configuration. +func (m *Main) ParseFlags(ctx context.Context, args []string) (err error) { + fs := flag.NewFlagSet("litestream", flag.ContinueOnError) + fs.StringVar(&m.ConfigPath, "config", "", "configuration path") + fs.Usage = m.usage + if err := fs.Parse(args); err != nil { return err } - // Ensure mount path is specified. - if flagSet.NArg() > 1 { - return errors.New("too many arguments, only specify mount path") - } else if m.Path = flagSet.Arg(0); m.Path == "" { - return errors.New("mount path required") - } - - // Ensure mount path exists & is a directory. - if fi, err := os.Stat(m.Path); err != nil { + // Load configuration. + if m.ConfigPath == "" { + return errors.New("-config required") + } else if m.Config, err = ReadConfigFile(m.ConfigPath); err != nil { return err - } else if !fi.IsDir() { - return fmt.Errorf("mount path must be a directory") } - // If no target is specified, default to a hidden directory based on the mount path. - if m.TargetPath == "" { - m.TargetPath = filepath.Join(filepath.Dir(m.Path), "."+filepath.Base(m.Path)) + return nil +} - if err := m.ensureTargetPath(); err != nil { +// Run loads all databases specified in the configuration. +func (m *Main) Run(ctx context.Context) (err error) { + if len(m.Config.DBs) == 0 { + return errors.New("configuration must specify at least one database") + } + + for _, dbc := range m.Config.DBs { + db := litestream.NewDB() + db.Path = dbc.Path + if err := db.Open(); err != nil { return err } + m.DBs = append(m.DBs, db) } - // Setup logging, if verbose specified. - var config fs.Config - if *verbose { - config.Debug = debug - m.logger = log.New(os.Stderr, "", log.LstdFlags) - } - - // Mount FUSE filesystem. - conn, err := fuse.Mount(m.Path) - if err != nil { - return err - } - defer fuse.Unmount(m.Path) - defer conn.Close() - - m.logger.Printf("mounted %s; target=%s", m.Path, m.TargetPath) - - fileSystem := litestream.NewFileSystem(m.TargetPath) - if err := fileSystem.Open(); err != nil { - return err - } - defer fileSystem.Close() - - s := fs.New(conn, &config) - return s.Serve(fileSystem) + return nil } -func (m *Main) ensureTargetPath() error { - // Check if target path exists, exit if it does. - if _, err := os.Stat(m.TargetPath); err == nil { - return nil - } else if err != nil && !os.IsNotExist(err) { - return err +// Close closes all open databases. +func (m *Main) Close() (err error) { + for _, db := range m.DBs { + if e := db.Close(); e != nil { + log.Printf("error closing db: path=%s err=%s", db.Path, e) + if err == nil { + err = e + } + } } - - // Create target path with the same permissions as the mount path. - fi, err := os.Stat(m.Path) - if err != nil { - return err - } - return os.Mkdir(m.TargetPath, fi.Mode()) + return err } -func (m *Main) usage() { +func (m *Main) Usage() { fmt.Println(` -Litestream is a FUSE file system that replicates SQLite databases. +Litestream is a daemon for replicating SQLite databases. Usage: - litestream [arguments] PATH + litestream [arguments] Arguments: - -target PATH - Specifies the directory to store data. - Defaults to a hidden directory next to PATH. - -v - Enable verbose logging. + -config PATH + Specifies the configuration file. Required. `[1:]) } - -// debug is a function that can be used for fs.Config.Debug. -// It marshals the msg to JSON and prints to the log. -func debug(msg interface{}) { - buf, err := json.Marshal(msg) - if err != nil { - println("debug: marshal error: %v", err) - return - } - log.Print("DEBUG ", string(buf)) -} diff --git a/db.go b/db.go index 4d951f0..aad1a61 100644 --- a/db.go +++ b/db.go @@ -27,7 +27,6 @@ const ( // DB represents an instance of a managed SQLite database in the file system. type DB struct { mu sync.Mutex - fs *FileSystem path string isHeaderValid bool // true if meta page contains SQLITE3 header @@ -49,8 +48,8 @@ type DB struct { } // NewDB returns a new instance of DB for a given path. -func NewDB(fs *FileSystem, path string) *DB { - db := &DB{fs: fs, path: path} +func NewDB() *DB { + db := &DB{} db.ctx, db.cancel = context.WithCancel(context.Background()) return db } diff --git a/doc/DESIGN.md b/doc/DESIGN.md index d7070c8..6661f10 100644 --- a/doc/DESIGN.md +++ b/doc/DESIGN.md @@ -1,38 +1,76 @@ -Litestream Design -================= +DESIGN +====== -Litestream provides a file system layer to intercept writes to a SQLite database -to construct a persistent write-ahead log that can be replicated. +Litestream is a sidecar process that replicates the write ahead log (WAL) for +a SQLite database. To ensure that it can replicate every page, litestream takes +control over the checkpointing process by issuing a long running read +transaction against the database to prevent checkpointing. It then releases +this transaction once it obtains a write lock and issues the checkpoint itself. + +The daemon polls the database on an interval to breifly obtain a write +transaction lock and copy over new WAL pages. Once the WAL has reached a +threshold size, litestream will issue a checkpoint and a single page write +to a table called `_litestream` to start the new WAL. + + +## Workflow + +When litestream first loads a database, it checks if there is an existing +sidecar directory which is named `.-litestream`. If not, it initializes +the directory and starts a new generation. + +A generation is a snapshot of the database followed by a continuous stream of +WAL files. A new generation is started on initialization & whenever litestream +cannot verify that it has a continuous record of WAL files. This could happen +if litestream is stopped and another process checkpoints the WAL. In this case, +a new generation ID is randomly created and a snapshot is replicated to the +appropriate destinations. + +Generations also prevent two servers from replicating to the same destination +and corrupting each other's data. In this case, each server would replicate +to a different generation directory. On recovery, there will be duplicate +databases and the end user can choose which generation to recover but each +database will be uncorrupted. ## File Layout +Litestream maintains a shadow WAL which is a historical record of all previous +WAL files. These files can be deleted after a time or size threshold but should +be replicated before being deleted. + ### Local +Given a database file named `db`, SQLite will create a WAL file called `db-wal`. +Litestream will then create a hidden directory called `.db-litestream` that +contains the historical record of all WAL files for the current generation. + ``` -dir/ - db # SQLite database - db-wal # SQLite WAL - db.litestream # per-db configuration - .db-litestream/ - log # recent event log - stat # per-db Prometheus statistics - generation # current generation number - wal/ # each WAL file contains pages in flush interval - active.wal # active WAL file exists until flush; renamed - 000000000000001.wal.gz # flushed, compressed WAL files - 000000000000002.wal.gz +db # SQLite database +db-wal # SQLite WAL +.db-litestream/ + generation # current generation number + generations/ + xxxxxxxx/ + wal/ # WAL files + 000000000000001.wal + 000000000000002.wal + 000000000000003.wal # active WAL ``` ### Remote (S3) ``` bkt/ - db/ # database path - 00000001/ # snapshot directory - snapshot # full db snapshot - 000000000000001.wal.gz # compressed WAL file - 000000000000002.wal.gz + db/ # database path + generations/ + xxxxxxxx/ + snapshots/ # snapshots w/ timestamp+offset + 20000101T000000Z-000000000000023.snapshot + wal/ # compressed WAL files + 000000000000001-0.wal.gz + 000000000000001-.wal.gz + 000000000000002-0.wal.gz 00000002/ snapshot/ 000000000000000.snapshot @@ -48,53 +86,3 @@ bkt/ ``` -## Process - -### File System Startup - -File system startup: - -1. Load litestream.config file. -2. Load all per-db ".litestream" files. - - -### DB startup: - -``` -IF "db" NOT EXISTS { - ensureWALRemovedIfDBNotExist() - restore() - setDBStatus("ok") - return -} - -IF "-wal" EXISTS { - syncToShadowWAL() - IF err { - setDBStatus("error") - } ELSE { - setDBStatus("ok") - } -} ELSE { - ensureShadowWALMatchesDB() // check last page written to DB - IF err { - setDBStatus("error") - } ELSE { - setDBStatus("ok") - } -} -``` - - -### DB Recovery - -TODO - - -### WAL Write - -1. Write to regular WAL -2. On fsync to regular WAL, copy WAL to shadow WAL. -2a. On copy error, mark errored & begin recovery - - diff --git a/doc/NOTES.md b/doc/NOTES.md deleted file mode 100644 index 692231e..0000000 --- a/doc/NOTES.md +++ /dev/null @@ -1,32 +0,0 @@ -NOTES -===== - -## RECOVERY - -### REAL WAL EXISTS, SHADOW EXISTS - -Scenario: Unclean close by application process. - -Action: Verify last page from both match. - - -### REAL WAL DOESN'T EXISTS, SHADOW EXISTS - -Scenario: Application closed cleanly & removed WAL. - -Action: Verify last page of shadow matches database page. - - -### REAL WAL EXISTS, SHADOW DOESN'T EXIST - -Scenario: Application wrote WAL; system crashed before shadow written/sync'd. - -Action: Start new generation. - - -### REAL WAL DOESN'T EXIST, SHADOW DOESN'T EXIST - -Scenario: No writes have occurred since the DB was switched to WAL mode. - -Action: Nothing to recover. Wait for first WAL write. - diff --git a/file_system.go b/file_system.go deleted file mode 100644 index be1ab37..0000000 --- a/file_system.go +++ /dev/null @@ -1,158 +0,0 @@ -package litestream - -import ( - "fmt" - "log" - "os" - "path/filepath" - "sort" - "sync" - - "bazil.org/fuse/fs" - // "github.com/pelletier/go-toml" -) - -var _ fs.FS = (*FileSystem)(nil) - -// FileSystem represents the file system that is mounted. -// It returns a root node that represents the root directory. -type FileSystem struct { - mu sync.RWMutex - dbs map[string]*DB // databases by path - - // Filepath to the root of the source directory. - TargetPath string -} - -func NewFileSystem(target string) *FileSystem { - return &FileSystem{ - dbs: make(map[string]*DB), - TargetPath: target, - } -} - -// Open initializes the file system and finds all managed database files. -func (f *FileSystem) Open() error { - f.mu.Lock() - defer f.mu.Unlock() - - return filepath.Walk(f.TargetPath, func(path string, info os.FileInfo, err error) error { - // Log errors while traversing file system. - if err != nil { - log.Printf("walk error: %s", err) - return nil - } - - // Ignore .-litestream metadata directories. - if IsMetaDir(path) { - return filepath.SkipDir - } else if !IsConfigPath(path) { - return nil - } - - // Determine the DB path relative to the target path. - rel, err := filepath.Rel(f.TargetPath, ConfigPathToDBPath(path)) - if err != nil { - return err - } - - // Initialize a DB object based on the config path. - // The database doesn't need to exist. It will be tracked when created. - db := NewDB(f, rel) - if err := db.Open(); err != nil { - log.Printf("cannot open db %q: %s", rel, err) - return nil - } - f.dbs[db.Path()] = db - - log.Printf("[DB]: %s", rel) - - return nil - }) -} - -// DB returns the DB object associated with path. -func (f *FileSystem) DB(path string) *DB { - fmt.Println("dbg/dbs", path, "--", f.DBPaths()) - - f.mu.RLock() - defer f.mu.RUnlock() - db := f.dbs[path] - return db -} - -// DBPaths returns a sorted list of all paths managed by the file system. -func (f *FileSystem) DBPaths() []string { - a := make([]string, 0, len(f.dbs)) - for k := range f.dbs { - a = append(a, k) - } - sort.Strings(a) - return a -} - -// OpenDB initializes a DB for a given path. -func (f *FileSystem) OpenDB(path string) error { - f.mu.Lock() - defer f.mu.Unlock() - return f.openDB(path) -} - -func (f *FileSystem) openDB(path string) error { - db := NewDB(f, path) - if err := db.Open(); err != nil { - return err - } - f.dbs[db.Path()] = db - return nil -} - -// Close closes the file system and flushes all managed database files. -func (f *FileSystem) Close() (err error) { - f.mu.Lock() - defer f.mu.Unlock() - return f.closeDBs() -} - -func (f *FileSystem) closeDBs() (err error) { - for key, db := range f.dbs { - if e := db.Close(); e != nil && err == nil { - err = e - } - delete(f.dbs, key) - } - return err -} - -// Root returns the file system root node. -func (f *FileSystem) Root() (fs.Node, error) { - return &Node{fs: f}, nil -} - -// Config represents the configuration file for the file system. -type Config struct { - Pattern string `toml:"pattern"` // glob pattern - ReadOnly bool `toml:"read-only"` // if true, expose only read access via FUSE - RecoverFrom string `toml:"recover-from"` // http URL, S3, etc. - - HTTP HTTPConfig `toml:"http"` - S3 S3Config `toml:"s3"` -} - -// DefaultConfig returns the default configuration. -func DefaultConfig() Config { - return Config{} -} - -// S3Config represents the configuration for replicating to/from an S3-compatible store. -type S3Config struct { - AccessKeyID string `toml:"access-key-id"` // AWS access key - SecretAccessKey string `toml:"secret-access-key"` // AWS secret access key -} - -// HTTPConfig represents the configuration for exposing data via HTTP. -type HTTPConfig struct { - Addr string `toml:"addr"` // bind address - CertFile string `toml:"cert-file"` // TLS certificate path - KeyFile string `toml:"key-file"` // TLS key path -} diff --git a/go.mod b/go.mod index 2823373..e4631a9 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/benbjohnson/litestream -go 1.14 +go 1.15 -require bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 +require ( + github.com/pelletier/go-toml v1.8.1 +) diff --git a/go.sum b/go.sum index 35d3875..fd4cef6 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,10 @@ -bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 h1:UrYe9YkT4Wpm6D+zByEyCJQzDqTPXqTDUI7bZ41i9VE= -bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05/go.mod h1:h0h5FBYpXThbvSfTqthw+0I4nmHnhTHkO5BoOHsBWqg= github.com/Julusian/godocdown v0.0.0-20170816220326-6d19f8ff2df8/go.mod h1:INZr5t32rG59/5xeltqoCJoNY7e5x/3xoY9WSWVWg74= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dvyukov/go-fuzz v0.0.0-20200318091601-be3528f3a813/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/pelletier/go-toml v1.8.1 h1:1Nf83orprkJyknT6h7zbuEGUEjcyVlCxSUGTENmNCRM= +github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= github.com/stephens2424/writerset v1.0.2/go.mod h1:aS2JhsMn6eA7e82oNmW4rfsgAOp9COBTTl8mzkwADnc= diff --git a/handle.go b/handle.go deleted file mode 100644 index a0009a5..0000000 --- a/handle.go +++ /dev/null @@ -1,123 +0,0 @@ -package litestream - -import ( - "context" - "io" - "log" - "os" - "sort" - "syscall" - - "bazil.org/fuse" - "bazil.org/fuse/fs" - "github.com/benbjohnson/litestream/sqlite" -) - -var _ fs.HandleFlusher = (*Handle)(nil) -var _ fs.HandleReadDirAller = (*Handle)(nil) -var _ fs.HandleReader = (*Handle)(nil) -var _ fs.HandleReleaser = (*Handle)(nil) -var _ fs.HandleWriter = (*Handle)(nil) - -// var _ fs.HandleReadAller = (*Handle)(nil) -// var _ fs.HandleFlockLocker = (*Handle)(nil) -//var _ fs.HandleLocker = (*Handle)(nil) -//var _ fs.HandlePOSIXLocker = (*Handle)(nil) - -// Handle represents a FUSE file handle. -type Handle struct { - node *Node - f *os.File -} - -// NewHandle returns a new instance of Handle. -func NewHandle(n *Node, f *os.File) *Handle { - return &Handle{node: n, f: f} -} - -// Release closes the underlying file descriptor. -func (h *Handle) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error) { - if err := h.node.Sync(); err != nil { - return err - } - return h.f.Close() -} - -// Read reads data from a given offset in the underlying file. -func (h *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) (err error) { - buf := make([]byte, req.Size) - n, err := h.f.ReadAt(buf, req.Offset) - if err != nil && err != io.EOF { - return err - } - resp.Data = buf[:n] - return nil -} - -// Write writes data at a given offset to the underlying file. -func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) { - log.Printf("write: name=%s offset=%d n=%d", h.f.Name(), req.Offset, len(req.Data)) - println(HexDump(req.Data)) - - if resp.Size, err = h.f.WriteAt(req.Data, req.Offset); err != nil { - // TODO: Invalidate node DB state. - return err - } - - // Check if handle reference a managed database. - db := h.node.DB() - if db == nil { - return nil - } - - // If this is the DB file, update the DB state based on the header. - if !sqlite.IsWALPath(h.node.Path()) { - // TODO: Header write could theoretically occur anywhere in first 100 bytes. - // If updating the header page, first validate it. - if req.Offset == 0 { - db.SetHeader(req.Data) - } - return nil - } - - // Ignore if the DB is not in a valid state (header + wal enabled). - if !db.Valid() { - return nil - } - - // Otherwise this is the WAL file so we should append the WAL data. - db.AddPendingWALByteN(int64(len(req.Data))) - - return nil -} - -// Flush is called when a file handle is synced to disk. Implements fs.HandleFlusher. -func (h *Handle) Flush(ctx context.Context, req *fuse.FlushRequest) (err error) { - if err := h.node.Sync(); err != nil { - return err - } - return h.f.Sync() -} - -// ReadDirAll returns a list of all entries in a directory. Implements fs.HandleReadDirAller. -func (h *Handle) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error) { - fis, err := h.f.Readdir(-1) - if err != nil { - return nil, err - } - - // Convert FileInfo objects to FUSE directory entries. - ents = make([]fuse.Dirent, 0, len(fis)) - for _, fi := range fis { - // Skip any meta directories. - if IsMetaDir(fi.Name()) { - continue - } - - statt := fi.Sys().(*syscall.Stat_t) - ents = append(ents, fuse.Dirent{Inode: statt.Ino, Name: fi.Name()}) - } - - sort.Slice(ents, func(i, j int) bool { return ents[i].Name < ents[j].Name }) - return ents, nil -} diff --git a/node.go b/node.go deleted file mode 100644 index 5dabfe8..0000000 --- a/node.go +++ /dev/null @@ -1,384 +0,0 @@ -package litestream - -import ( - "context" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - "syscall" - "time" - - "bazil.org/fuse" - "bazil.org/fuse/fs" - "github.com/benbjohnson/litestream/sqlite" -) - -var _ fs.Node = (*Node)(nil) -var _ fs.NodeAccesser = (*Node)(nil) -var _ fs.NodeCreater = (*Node)(nil) -var _ fs.NodeFsyncer = (*Node)(nil) -var _ fs.NodeGetxattrer = (*Node)(nil) -var _ fs.NodeLinker = (*Node)(nil) -var _ fs.NodeListxattrer = (*Node)(nil) -var _ fs.NodeMkdirer = (*Node)(nil) -var _ fs.NodeMknoder = (*Node)(nil) -var _ fs.NodeOpener = (*Node)(nil) -var _ fs.NodeReadlinker = (*Node)(nil) -var _ fs.NodeRemover = (*Node)(nil) -var _ fs.NodeRemovexattrer = (*Node)(nil) -var _ fs.NodeRenamer = (*Node)(nil) -var _ fs.NodeSetattrer = (*Node)(nil) -var _ fs.NodeSetxattrer = (*Node)(nil) -var _ fs.NodeStringLookuper = (*Node)(nil) -var _ fs.NodeSymlinker = (*Node)(nil) - -// Node represents a file or directory in the file system. -type Node struct { - mu sync.RWMutex - fs *FileSystem // base filesystem - path string // path within file system -} - -func NewNode(fs *FileSystem, path string) *Node { - assert(fs != nil, "node file system required") - assert(path != "", "node path required") - return &Node{fs: fs, path: path} -} - -// Path returns the path the node was initialized with. -func (n *Node) Path() string { - return n.path -} - -func (n *Node) srcpath() string { - return filepath.Join(n.fs.TargetPath, n.path) -} - -// IsWAL returns true if node path has a "-wal" suffix. -func (n *Node) IsWAL() bool { - return strings.HasSuffix(n.path, sqlite.WALSuffix) -} - -// DB returns the DB object associated with the node, if any. -// If node points to a "-wal" file then the associated DB is returned. -func (n *Node) DB() *DB { - println("dbg/node.db", n.path, strings.HasPrefix(n.path, sqlite.WALSuffix)) - if strings.HasSuffix(n.path, sqlite.WALSuffix) { - println("dbg/node.db.trim", n.path, strings.TrimSuffix(n.path, sqlite.WALSuffix)) - return n.fs.DB(strings.TrimSuffix(n.path, sqlite.WALSuffix)) - } - println("dbg/node.db.other", n.path) - return n.fs.DB(n.path) -} - -// Sync synchronizes the data to the shadow WAL if this node is the WAL. -func (n *Node) Sync() (err error) { - println("dbg/node.sync") - // Ignore if this is not the WAL. - if !n.IsWAL() { - println("dbg/node.sync.notwal", n.path) - return nil - } - - // Ignore if the node is not a managed db. - db := n.DB() - if db == nil { - println("dbg/node.sync.notmanaged", n.path) - return nil - } - - return db.Sync() -} - -func (n *Node) Attr(ctx context.Context, a *fuse.Attr) (err error) { - fi, err := os.Stat(n.srcpath()) - if err != nil { - return err - } - statt := fi.Sys().(*syscall.Stat_t) - - // TODO: Cache attr w/ a.Valid? - - if n.path == "" { - a.Inode = 1 - } else { - a.Inode = statt.Ino - } - a.Size = uint64(fi.Size()) - a.Blocks = uint64(statt.Blocks) - a.Atime = time.Unix(statt.Atim.Sec, statt.Atim.Nsec).UTC() - a.Mtime = time.Unix(statt.Mtim.Sec, statt.Mtim.Nsec).UTC() - a.Ctime = time.Unix(statt.Ctim.Sec, statt.Ctim.Nsec).UTC() - a.Mode = fi.Mode() - a.Nlink = uint32(statt.Nlink) - a.Uid = uint32(statt.Uid) - a.Gid = uint32(statt.Gid) - a.Rdev = uint32(statt.Rdev) - a.BlockSize = uint32(statt.Blksize) - - return nil -} - -// Lookup looks up a specific entry in the receiver, -// which must be a directory. Lookup should return a Node -// corresponding to the entry. If the name does not exist in -// the directory, Lookup should return ENOENT. -// -// Lookup need not to handle the names "." and "..". -func (n *Node) Lookup(ctx context.Context, name string) (_ fs.Node, err error) { - path := filepath.Join(n.path, name) - srcpath := filepath.Join(n.fs.TargetPath, path) - if _, err := os.Stat(srcpath); os.IsNotExist(err) { - return nil, syscall.ENOENT - } - return NewNode(n.fs, path), nil -} - -func (n *Node) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error) { - fis, err := ioutil.ReadDir(n.srcpath()) - if err != nil { - return nil, err - } - - ents = make([]fuse.Dirent, 0, len(fis)) - for _, fi := range fis { - // Skip any meta directories. - if IsMetaDir(fi.Name()) { - continue - } - - statt := fi.Sys().(*syscall.Stat_t) - ents = append(ents, fuse.Dirent{Inode: statt.Ino, Name: fi.Name()}) - } - - return ents, nil -} - -// Setattr sets the standard metadata for the receiver. -// -// Note, this is also used to communicate changes in the size of -// the file, outside of Writes. -// -// req.Valid is a bitmask of what fields are actually being set. -// For example, the method should not change the mode of the file -// unless req.Valid.Mode() is true. -func (n *Node) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) (err error) { - // Obtain current file stat. - srcpath := n.srcpath() - fi, err := os.Stat(srcpath) - if err != nil { - return err - } - statt := fi.Sys().(*syscall.Stat_t) - - // Update access time, if flagged. - var atime time.Time - if req.Valid.AtimeNow() { - atime = time.Now() - } else if req.Valid.Atime() { - atime = req.Atime - } - - // Update mod time, if flagged. - var mtime time.Time - if req.Valid.MtimeNow() { - mtime = time.Now() - } else if req.Valid.Mtime() { - mtime = req.Mtime - } - - // Update timestamps, if specified. - if !atime.IsZero() || !mtime.IsZero() { - if atime.IsZero() { - atime = time.Unix(statt.Atim.Sec, statt.Atim.Nsec).UTC() - } - if mtime.IsZero() { - mtime = time.Unix(statt.Mtim.Sec, statt.Mtim.Nsec).UTC() - } - if err := os.Chtimes(srcpath, atime, mtime); err != nil { - return err - } - } - - // Update group id. - if req.Valid.Gid() { - if err := syscall.Setgid(int(req.Gid)); err != nil { - return err - } - } - - // Update user id. - if req.Valid.Uid() { - if err := syscall.Setuid(int(req.Uid)); err != nil { - return err - } - } - - // Update file permissions. - if req.Valid.Mode() { - if err := os.Chmod(srcpath, req.Mode); err != nil { - return err - } - } - - // Update file size. - if req.Valid.Size() { - if err := os.Truncate(srcpath, int64(req.Size)); err != nil { - return err - } - } - - // TODO: Not sure what these are for. - if req.Valid.Handle() { - println("TODO: setattr.handle") - } - if req.Valid.LockOwner() { - println("TODO: setattr.lockowner") - } - - // Update response attributes. - return n.Attr(ctx, &resp.Attr) -} - -// Symlink creates a new symbolic link in the receiver, which must be a directory. -func (n *Node) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (_ fs.Node, err error) { - if err := os.Symlink(req.Target, req.NewName); err != nil { - return nil, err - } - return NewNode(n.fs, req.NewName), nil -} - -// Readlink reads a symbolic link. -func (n *Node) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (_ string, err error) { - return os.Readlink(n.srcpath()) -} - -// Link creates a new directory entry in the receiver based on an -// existing Node. Receiver must be a directory. -func (n *Node) Link(ctx context.Context, req *fuse.LinkRequest, _old fs.Node) (_ fs.Node, err error) { - old := _old.(*Node) - - // assert(n.IsDir()) - - if err := os.Link(old.srcpath(), req.NewName); err != nil { - return nil, err - } - return NewNode(n.fs, req.NewName), nil -} - -// Remove removes the entry with the given name from -// the receiver, which must be a directory. The entry to be removed -// may correspond to a file (unlink) or to a directory (rmdir). -func (n *Node) Remove(ctx context.Context, req *fuse.RemoveRequest) (err error) { - path := filepath.Join(n.srcpath(), req.Name) - if IsMetaDir(path) { - return syscall.ENOENT - } - - if req.Dir { - return syscall.Rmdir(path) - } - - // TODO: Clear db header. - - return syscall.Unlink(path) -} - -// Access checks whether the calling context has permission for -// the given operations on the receiver. If so, Access should -// return nil. If not, Access should return EPERM. -// -// Note that this call affects the result of the access(2) system -// call but not the open(2) system call. If Access is not -// implemented, the Node behaves as if it always returns nil -// (permission granted), relying on checks in Open instead. -func (n *Node) Access(ctx context.Context, req *fuse.AccessRequest) (err error) { - return syscall.Access(n.srcpath(), req.Mask) -} - -func (n *Node) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (_ fs.Node, err error) { - if err := syscall.Mkdir(filepath.Join(n.srcpath(), req.Name), uint32(req.Mode^req.Umask)); err != nil { - return nil, err - } - return NewNode(n.fs, filepath.Join(n.path, req.Name)), nil -} - -// Open opens the receiver. After a successful open, a client -// process has a file descriptor referring to this Handle. -func (n *Node) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (_ fs.Handle, err error) { - // TODO(bbj): Where does mode come from? - f, err := os.OpenFile(n.srcpath(), int(req.Flags), 0777) - if err != nil { - return nil, err - } - return NewHandle(n, f), nil -} - -// Create creates a new directory entry in the receiver, which must be a directory. -func (n *Node) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (_ fs.Node, _ fs.Handle, err error) { - f, err := os.OpenFile(filepath.Join(n.srcpath(), req.Name), int(req.Flags), req.Mode^req.Umask) - if err != nil { - return nil, nil, err - } - nn := NewNode(n.fs, filepath.Join(n.path, req.Name)) - return nn, NewHandle(nn, f), nil -} - -func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, _newDir fs.Node) (err error) { - newDir := _newDir.(*Node) - return os.Rename(filepath.Join(n.srcpath(), req.OldName), filepath.Join(newDir.srcpath(), req.NewName)) -} - -func (n *Node) Mknod(ctx context.Context, req *fuse.MknodRequest) (_ fs.Node, err error) { - if err := syscall.Mknod(filepath.Join(n.srcpath(), req.Name), uint32(req.Mode^req.Umask), int(req.Rdev)); err != nil { - return nil, err - } - return NewNode(n.fs, filepath.Join(n.path, req.Name)), nil -} - -func (n *Node) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) { - // Synchronize to shadow WAL. - if err := n.Sync(); err != nil { - return err - } - - f, err := os.Open(n.srcpath()) - if err != nil { - return err - } - defer f.Close() - - // TODO(bbj): Handle fdatasync() - return f.Sync() -} - -// Getxattr gets an extended attribute by the given name from the node. -func (n *Node) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) (err error) { - // TODO(bbj): Handle req.Size & returned syscall.Getxattr() size. - if _, err = syscall.Getxattr(n.srcpath(), req.Name, resp.Xattr); err == syscall.ENODATA { - return fuse.ErrNoXattr - } - return err -} - -// Listxattr lists the extended attributes recorded for the node. -func (n *Node) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) (err error) { - // TODO(bbj): Handle req.Size & returned syscall.Getxattr() size. - _, err = syscall.Listxattr(n.srcpath(), resp.Xattr) - return err -} - -// Setxattr sets an extended attribute with the given name and -// value for the node. -func (n *Node) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) (err error) { - return syscall.Setxattr(n.srcpath(), req.Name, req.Xattr, int(req.Flags)) -} - -// Removexattr removes an extended attribute for the name. -// -// If there is no xattr by that name, returns fuse.ErrNoXattr. -func (n *Node) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) (err error) { - return syscall.Removexattr(n.srcpath(), req.Name) -} diff --git a/server.go b/server.go deleted file mode 100644 index 4046b45..0000000 --- a/server.go +++ /dev/null @@ -1,237 +0,0 @@ -package litestream - -/* -import ( - "context" - "io" - "sync" - - "bazil.org/fuse" -) - -type Server struct { - wg sync.WaitGroup - - SourcePath string -} - -func NewServer() *Server { - return &Server{} -} - -func (s *Server) Close() error { - s.wg.Wait() - return nil -} - -func (s *Server) Serve(ctx context.Context, conn *fuse.Conn) error { - for { - r, err := conn.ReadRequest() - if err == io.EOF { - return nil - } else if err != nil { - return err - } - - s.wg.Add(1) - go func() { defer s.wg.Done(); s.handleRequest(ctx, r) }() - } -} - -func (s *Server) handleRequest(ctx context.Context, r fuse.Request) { - switch r := r.(type) { - case *fuse.AccessRequest: - s.handleAccessRequest(ctx, r) - case *fuse.BatchForgetRequest: - s.handleBatchForgetRequest(ctx, r) - case *fuse.CreateRequest: - s.handleCreateRequest(ctx, r) - case *fuse.DestroyRequest: - s.handleDestroyRequest(ctx, r) - case *fuse.ExchangeDataRequest: - s.handleExchangeDataRequest(ctx, r) - case *fuse.FlushRequest: - s.handleFlushRequest(ctx, r) - case *fuse.ForgetRequest: - s.handleForgetRequest(ctx, r) - case *fuse.FsyncRequest: - s.handleFsyncRequest(ctx, r) - case *fuse.GetattrRequest: - s.handleGetattrRequest(ctx, r) - case *fuse.GetxattrRequest: - s.handleGetxattrRequest(ctx, r) - case *fuse.InterruptRequest: - s.handleInterruptRequest(ctx, r) - case *fuse.LinkRequest: - s.handleLinkRequest(ctx, r) - case *fuse.ListxattrRequest: - s.handleListxattrRequest(ctx, r) - case *fuse.LockRequest: - s.handleLockRequest(ctx, r) - case *fuse.LookupRequest: - s.handleLookupRequest(ctx, r) - case *fuse.MkdirRequest: - s.handleMkdirRequest(ctx, r) - case *fuse.MknodRequest: - s.handleMknodRequest(ctx, r) - case *fuse.OpenRequest: - s.handleOpenRequest(ctx, r) - case *fuse.PollRequest: - s.handlePollRequest(ctx, r) - case *fuse.QueryLockRequest: - s.handleQueryLockRequest(ctx, r) - case *fuse.ReadRequest: - s.handleReadRequest(ctx, r) - case *fuse.ReadlinkRequest: - s.handleReadlinkRequest(ctx, r) - case *fuse.ReleaseRequest: - s.handleReleaseRequest(ctx, r) - case *fuse.RemoveRequest: - s.handleRemoveRequest(ctx, r) - case *fuse.RemovexattrRequest: - s.handleRemovexattrRequest(ctx, r) - case *fuse.RenameRequest: - s.handleRenameRequest(ctx, r) - case *fuse.SetattrRequest: - s.handleSetattrRequest(ctx, r) - case *fuse.SetxattrRequest: - s.handleSetxattrRequest(ctx, r) - case *fuse.StatfsRequest: - s.handleStatfsRequest(ctx, r) - case *fuse.SymlinkRequest: - s.handleSymlinkRequest(ctx, r) - case *fuse.UnrecognizedRequest: - s.handleUnrecognizedRequest(ctx, r) - case *fuse.WriteRequest: - s.handleWriteRequest(ctx, r) - } -} - -func (s *Server) handleAccessRequest(ctx context.Context, r *fuse.AccessRequest) { - panic("TODO") -} - -func (s *Server) handleBatchForgetRequest(ctx context.Context, r *fuse.BatchForgetRequest) { - panic("TODO") -} - -func (s *Server) handleCreateRequest(ctx context.Context, r *fuse.CreateRequest) { - panic("TODO") -} - -func (s *Server) handleDestroyRequest(ctx context.Context, r *fuse.DestroyRequest) { - panic("TODO") -} - -func (s *Server) handleExchangeDataRequest(ctx context.Context, r *fuse.ExchangeDataRequest) { - panic("TODO") -} - -func (s *Server) handleFlushRequest(ctx context.Context, r *fuse.FlushRequest) { - panic("TODO") -} - -func (s *Server) handleForgetRequest(ctx context.Context, r *fuse.ForgetRequest) { - panic("TODO") -} - -func (s *Server) handleFsyncRequest(ctx context.Context, r *fuse.FsyncRequest) { - panic("TODO") -} - -func (s *Server) handleGetattrRequest(ctx context.Context, r *fuse.GetattrRequest) { - panic("TODO") -} - -func (s *Server) handleGetxattrRequest(ctx context.Context, r *fuse.GetxattrRequest) { - panic("TODO") -} - -func (s *Server) handleInterruptRequest(ctx context.Context, r *fuse.InterruptRequest) { - panic("TODO") -} - -func (s *Server) handleLinkRequest(ctx context.Context, r *fuse.LinkRequest) { - panic("TODO") -} - -func (s *Server) handleListxattrRequest(ctx context.Context, r *fuse.ListxattrRequest) { - panic("TODO") -} - -func (s *Server) handleLockRequest(ctx context.Context, r *fuse.LockRequest) { - panic("TODO") -} - -func (s *Server) handleLookupRequest(ctx context.Context, r *fuse.LookupRequest) { - panic("TODO") -} - -func (s *Server) handleMkdirRequest(ctx context.Context, r *fuse.MkdirRequest) { - panic("TODO") -} - -func (s *Server) handleMknodRequest(ctx context.Context, r *fuse.MknodRequest) { - panic("TODO") -} - -func (s *Server) handleOpenRequest(ctx context.Context, r *fuse.OpenRequest) { - panic("TODO") -} - -func (s *Server) handlePollRequest(ctx context.Context, r *fuse.PollRequest) { - panic("TODO") -} - -func (s *Server) handleQueryLockRequest(ctx context.Context, r *fuse.QueryLockRequest) { - panic("TODO") -} - -func (s *Server) handleReadRequest(ctx context.Context, r *fuse.ReadRequest) { - panic("TODO") -} - -func (s *Server) handleReadlinkRequest(ctx context.Context, r *fuse.ReadlinkRequest) { - panic("TODO") -} - -func (s *Server) handleReleaseRequest(ctx context.Context, r *fuse.ReleaseRequest) { - panic("TODO") -} - -func (s *Server) handleRemoveRequest(ctx context.Context, r *fuse.RemoveRequest) { - panic("TODO") -} - -func (s *Server) handleRemovexattrRequest(ctx context.Context, r *fuse.RemovexattrRequest) { - panic("TODO") -} - -func (s *Server) handleRenameRequest(ctx context.Context, r *fuse.RenameRequest) { - panic("TODO") -} - -func (s *Server) handleSetattrRequest(ctx context.Context, r *fuse.SetattrRequest) { - panic("TODO") -} - -func (s *Server) handleSetxattrRequest(ctx context.Context, r *fuse.SetxattrRequest) { - panic("TODO") -} - -func (s *Server) handleStatfsRequest(ctx context.Context, r *fuse.StatfsRequest) { - panic("TODO") -} - -func (s *Server) handleSymlinkRequest(ctx context.Context, r *fuse.SymlinkRequest) { - panic("TODO") -} - -func (s *Server) handleUnrecognizedRequest(ctx context.Context, r *fuse.UnrecognizedRequest) { - panic("TODO") -} - -func (s *Server) handleWriteRequest(ctx context.Context, r *fuse.WriteRequest) { - panic("TODO") -} -*/