From 0cf8c29e168db8de71ac12163e51600bb5258611 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 8 Oct 2020 15:01:24 -0600 Subject: [PATCH] beginning fuse filesystem impl --- README.md | 1 + fs.go | 87 ++++++++++++++++++++ go.mod | 7 ++ go.sum | 31 +++++++ main.go | 81 +++++++++++++++++++ server.go | 237 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 444 insertions(+) create mode 100644 fs.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 server.go diff --git a/README.md b/README.md index 822b0c7..4e4fd8f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # litestream + Streaming replication for SQLite. diff --git a/fs.go b/fs.go new file mode 100644 index 0000000..54fe34b --- /dev/null +++ b/fs.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "syscall" + "time" + + "bazil.org/fuse" + "bazil.org/fuse/fs" +) + +var _ fs.FS = (*FS)(nil) + +type FS struct { + SourcePath string +} + +// Root returns the file system root. +func (f *FS) Root() (fs.Node, error) { + return &File{fs: f}, nil +} + +var _ fs.Node = (*File)(nil) + +type File struct { + fs *FS // base filesystem + path string // path within file system +} + +func (f *File) srcpath() string { + return filepath.Join(f.fs.SourcePath, f.path) +} + +func (f *File) Attr(ctx context.Context, a *fuse.Attr) error { + fi, err := os.Stat(f.srcpath()) + if err != nil { + return err + } + statt := fi.Sys().(*syscall.Stat_t) + + // TODO: Cache attr w/ a.Valid? + + if f.path == "" { + a.Inode = 1 + } else { + a.Inode = statt.Ino + } + a.Size = uint64(fi.Size()) + a.Blocks = uint64(statt.Blocks) + a.Atime = time.Unix(statt.Atim.Sec, statt.Atim.Nsec).UTC() + a.Mtime = time.Unix(statt.Mtim.Sec, statt.Mtim.Nsec).UTC() + a.Ctime = time.Unix(statt.Ctim.Sec, statt.Ctim.Nsec).UTC() + a.Mode = fi.Mode() + a.Nlink = uint32(statt.Nlink) + a.Uid = uint32(statt.Uid) + a.Gid = uint32(statt.Gid) + a.Rdev = uint32(statt.Rdev) + a.BlockSize = uint32(statt.Blksize) + + return nil +} + +func (f *File) Lookup(ctx context.Context, name string) (fs.Node, error) { + path := filepath.Join(f.path, name) + srcpath := filepath.Join(f.fs.SourcePath, path) + if _, err := os.Stat(srcpath); os.IsNotExist(err) { + return nil, syscall.ENOENT + } + return &File{fs: f.fs, path: path}, nil +} + +func (f *File) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { + fis, err := ioutil.ReadDir(f.srcpath()) + if err != nil { + return nil, err + } + + ents := make([]fuse.Dirent, len(fis)) + for i, fi := range fis { + statt := fi.Sys().(*syscall.Stat_t) + ents[i] = fuse.Dirent{Inode: statt.Ino, Name: fi.Name()} + } + return ents, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1a930d7 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module github.com/middlemost/litestream + +go 1.14 + +require ( + bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a32e27b --- /dev/null +++ b/go.sum @@ -0,0 +1,31 @@ +bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 h1:UrYe9YkT4Wpm6D+zByEyCJQzDqTPXqTDUI7bZ41i9VE= +bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05/go.mod h1:h0h5FBYpXThbvSfTqthw+0I4nmHnhTHkO5BoOHsBWqg= +github.com/Julusian/godocdown v0.0.0-20170816220326-6d19f8ff2df8/go.mod h1:INZr5t32rG59/5xeltqoCJoNY7e5x/3xoY9WSWVWg74= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dvyukov/go-fuzz v0.0.0-20200318091601-be3528f3a813/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= +github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= +github.com/stephens2424/writerset v1.0.2/go.mod h1:aS2JhsMn6eA7e82oNmW4rfsgAOp9COBTTl8mzkwADnc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191210023423-ac6580df4449 h1:gSbV7h1NRL2G1xTg/owz62CST1oJBmxy4QpMMregXVQ= +golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200423201157-2723c5de0d66/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go new file mode 100644 index 0000000..3bb8411 --- /dev/null +++ b/main.go @@ -0,0 +1,81 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "io/ioutil" + "log" + "os" + + "bazil.org/fuse" + "bazil.org/fuse/fs" +) + +func main() { + m := NewMain() + if err := m.Run(os.Args[1:]); err == flag.ErrHelp { + os.Exit(1) + } else if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +type Main struct { + logger *log.Logger + + SourcePath string + MountPath string +} + +func NewMain() *Main { + return &Main{ + logger: log.New(ioutil.Discard, "", log.LstdFlags), + } +} + +func (m *Main) Run(args []string) (err error) { + flagSet := flag.NewFlagSet("litestream", flag.ContinueOnError) + verbose := flagSet.Bool("v", false, "verbose") + flagSet.Usage = m.usage + if err := flagSet.Parse(args); err != nil { + return err + } + if m.SourcePath = flagSet.Arg(0); m.SourcePath == "" { + return errors.New("source path required") + } else if m.MountPath = flagSet.Arg(1); m.MountPath == "" { + return errors.New("mount path required") + } + + // Setup logging, if verbose specified. + if *verbose { + m.logger = log.New(os.Stderr, "", log.LstdFlags) + } + + // Mount FUSE filesystem. + conn, err := fuse.Mount(m.MountPath, fuse.FSName("litestream"), fuse.Subtype("litestreamfs")) + if err != nil { + return err + } + defer fuse.Unmount(m.MountPath) + defer conn.Close() + + return fs.Serve(conn, &FS{SourcePath: m.SourcePath}) +} + +func (m *Main) usage() { + fmt.Println(` +Litestream is a FUSE file system that automatically replicates SQLite databases. + +Usage: + + litestream [arguments] source_dir mount_dir + +Arguments: + + -v + Enable verbose logging. + +`[1:]) +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..23bcd5e --- /dev/null +++ b/server.go @@ -0,0 +1,237 @@ +package main + +/* +import ( + "context" + "io" + "sync" + + "bazil.org/fuse" +) + +type Server struct { + wg sync.WaitGroup + + SourcePath string +} + +func NewServer() *Server { + return &Server{} +} + +func (s *Server) Close() error { + s.wg.Wait() + return nil +} + +func (s *Server) Serve(ctx context.Context, conn *fuse.Conn) error { + for { + r, err := conn.ReadRequest() + if err == io.EOF { + return nil + } else if err != nil { + return err + } + + s.wg.Add(1) + go func() { defer s.wg.Done(); s.handleRequest(ctx, r) }() + } +} + +func (s *Server) handleRequest(ctx context.Context, r fuse.Request) { + switch r := r.(type) { + case *fuse.AccessRequest: + s.handleAccessRequest(ctx, r) + case *fuse.BatchForgetRequest: + s.handleBatchForgetRequest(ctx, r) + case *fuse.CreateRequest: + s.handleCreateRequest(ctx, r) + case *fuse.DestroyRequest: + s.handleDestroyRequest(ctx, r) + case *fuse.ExchangeDataRequest: + s.handleExchangeDataRequest(ctx, r) + case *fuse.FlushRequest: + s.handleFlushRequest(ctx, r) + case *fuse.ForgetRequest: + s.handleForgetRequest(ctx, r) + case *fuse.FsyncRequest: + s.handleFsyncRequest(ctx, r) + case *fuse.GetattrRequest: + s.handleGetattrRequest(ctx, r) + case *fuse.GetxattrRequest: + s.handleGetxattrRequest(ctx, r) + case *fuse.InterruptRequest: + s.handleInterruptRequest(ctx, r) + case *fuse.LinkRequest: + s.handleLinkRequest(ctx, r) + case *fuse.ListxattrRequest: + s.handleListxattrRequest(ctx, r) + case *fuse.LockRequest: + s.handleLockRequest(ctx, r) + case *fuse.LookupRequest: + s.handleLookupRequest(ctx, r) + case *fuse.MkdirRequest: + s.handleMkdirRequest(ctx, r) + case *fuse.MknodRequest: + s.handleMknodRequest(ctx, r) + case *fuse.OpenRequest: + s.handleOpenRequest(ctx, r) + case *fuse.PollRequest: + s.handlePollRequest(ctx, r) + case *fuse.QueryLockRequest: + s.handleQueryLockRequest(ctx, r) + case *fuse.ReadRequest: + s.handleReadRequest(ctx, r) + case *fuse.ReadlinkRequest: + s.handleReadlinkRequest(ctx, r) + case *fuse.ReleaseRequest: + s.handleReleaseRequest(ctx, r) + case *fuse.RemoveRequest: + s.handleRemoveRequest(ctx, r) + case *fuse.RemovexattrRequest: + s.handleRemovexattrRequest(ctx, r) + case *fuse.RenameRequest: + s.handleRenameRequest(ctx, r) + case *fuse.SetattrRequest: + s.handleSetattrRequest(ctx, r) + case *fuse.SetxattrRequest: + s.handleSetxattrRequest(ctx, r) + case *fuse.StatfsRequest: + s.handleStatfsRequest(ctx, r) + case *fuse.SymlinkRequest: + s.handleSymlinkRequest(ctx, r) + case *fuse.UnrecognizedRequest: + s.handleUnrecognizedRequest(ctx, r) + case *fuse.WriteRequest: + s.handleWriteRequest(ctx, r) + } +} + +func (s *Server) handleAccessRequest(ctx context.Context, r *fuse.AccessRequest) { + panic("TODO") +} + +func (s *Server) handleBatchForgetRequest(ctx context.Context, r *fuse.BatchForgetRequest) { + panic("TODO") +} + +func (s *Server) handleCreateRequest(ctx context.Context, r *fuse.CreateRequest) { + panic("TODO") +} + +func (s *Server) handleDestroyRequest(ctx context.Context, r *fuse.DestroyRequest) { + panic("TODO") +} + +func (s *Server) handleExchangeDataRequest(ctx context.Context, r *fuse.ExchangeDataRequest) { + panic("TODO") +} + +func (s *Server) handleFlushRequest(ctx context.Context, r *fuse.FlushRequest) { + panic("TODO") +} + +func (s *Server) handleForgetRequest(ctx context.Context, r *fuse.ForgetRequest) { + panic("TODO") +} + +func (s *Server) handleFsyncRequest(ctx context.Context, r *fuse.FsyncRequest) { + panic("TODO") +} + +func (s *Server) handleGetattrRequest(ctx context.Context, r *fuse.GetattrRequest) { + panic("TODO") +} + +func (s *Server) handleGetxattrRequest(ctx context.Context, r *fuse.GetxattrRequest) { + panic("TODO") +} + +func (s *Server) handleInterruptRequest(ctx context.Context, r *fuse.InterruptRequest) { + panic("TODO") +} + +func (s *Server) handleLinkRequest(ctx context.Context, r *fuse.LinkRequest) { + panic("TODO") +} + +func (s *Server) handleListxattrRequest(ctx context.Context, r *fuse.ListxattrRequest) { + panic("TODO") +} + +func (s *Server) handleLockRequest(ctx context.Context, r *fuse.LockRequest) { + panic("TODO") +} + +func (s *Server) handleLookupRequest(ctx context.Context, r *fuse.LookupRequest) { + panic("TODO") +} + +func (s *Server) handleMkdirRequest(ctx context.Context, r *fuse.MkdirRequest) { + panic("TODO") +} + +func (s *Server) handleMknodRequest(ctx context.Context, r *fuse.MknodRequest) { + panic("TODO") +} + +func (s *Server) handleOpenRequest(ctx context.Context, r *fuse.OpenRequest) { + panic("TODO") +} + +func (s *Server) handlePollRequest(ctx context.Context, r *fuse.PollRequest) { + panic("TODO") +} + +func (s *Server) handleQueryLockRequest(ctx context.Context, r *fuse.QueryLockRequest) { + panic("TODO") +} + +func (s *Server) handleReadRequest(ctx context.Context, r *fuse.ReadRequest) { + panic("TODO") +} + +func (s *Server) handleReadlinkRequest(ctx context.Context, r *fuse.ReadlinkRequest) { + panic("TODO") +} + +func (s *Server) handleReleaseRequest(ctx context.Context, r *fuse.ReleaseRequest) { + panic("TODO") +} + +func (s *Server) handleRemoveRequest(ctx context.Context, r *fuse.RemoveRequest) { + panic("TODO") +} + +func (s *Server) handleRemovexattrRequest(ctx context.Context, r *fuse.RemovexattrRequest) { + panic("TODO") +} + +func (s *Server) handleRenameRequest(ctx context.Context, r *fuse.RenameRequest) { + panic("TODO") +} + +func (s *Server) handleSetattrRequest(ctx context.Context, r *fuse.SetattrRequest) { + panic("TODO") +} + +func (s *Server) handleSetxattrRequest(ctx context.Context, r *fuse.SetxattrRequest) { + panic("TODO") +} + +func (s *Server) handleStatfsRequest(ctx context.Context, r *fuse.StatfsRequest) { + panic("TODO") +} + +func (s *Server) handleSymlinkRequest(ctx context.Context, r *fuse.SymlinkRequest) { + panic("TODO") +} + +func (s *Server) handleUnrecognizedRequest(ctx context.Context, r *fuse.UnrecognizedRequest) { + panic("TODO") +} + +func (s *Server) handleWriteRequest(ctx context.Context, r *fuse.WriteRequest) { + panic("TODO") +} +*/