diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 799088a..ef65220 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -78,7 +78,7 @@ func (m *Main) Run(args []string) (err error) { } // Mount FUSE filesystem. - conn, err := fuse.Mount(m.Path, fuse.FSName("litestream"), fuse.Subtype("litestreamfs")) + conn, err := fuse.Mount(m.Path, fuse.LockingPOSIX()) if err != nil { return err } diff --git a/db.go b/db.go index 7ac93cf..927f73f 100644 --- a/db.go +++ b/db.go @@ -2,6 +2,7 @@ package litestream import ( "context" + "os" "path/filepath" "strings" "sync" @@ -55,7 +56,11 @@ func (db *DB) LogPath() string { // Open loads the configuration file func (db *DB) Open() error { - // TODO: Ensure sidecar directory structure exists. + // Ensure meta directory exists. + if err := os.MkdirAll(db.MetaPath(), 0600); err != nil { + return err + } + return nil } diff --git a/handle.go b/handle.go index 791c5a2..8f96c26 100644 --- a/handle.go +++ b/handle.go @@ -2,7 +2,6 @@ package litestream import ( "context" - "encoding/hex" "io" "log" "os" @@ -54,7 +53,7 @@ func (h *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Rea // Write writes data at a given offset to the underlying file. func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) { log.Printf("write: name=%s offset=%d n=%d", h.f.Name(), req.Offset, len(req.Data)) - println(hex.Dump(req.Data)) + println(HexDump(req.Data)) resp.Size, err = h.f.WriteAt(req.Data, req.Offset) return err @@ -86,24 +85,7 @@ func (h *Handle) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error) // Lock tries to acquire a lock on a byte range of the node. If a // conflicting lock is already held, returns syscall.EAGAIN. func (h *Handle) Lock(ctx context.Context, req *fuse.LockRequest) error { - return h.lock(ctx, req) -} - -// LockWait acquires a lock on a byte range of the node, waiting -// until the lock can be obtained (or context is canceled). -func (h *Handle) LockWait(ctx context.Context, req *fuse.LockWaitRequest) error { - return h.lock(ctx, (*fuse.LockRequest)(req)) -} - -// Unlock releases the lock on a byte range of the node. Locks can -// be released also implicitly, see HandleFlockLocker and -// HandlePOSIXLocker. -func (h *Handle) Unlock(ctx context.Context, req *fuse.UnlockRequest) error { - return h.lock(ctx, (*fuse.LockRequest)(req)) - -} - -func (h *Handle) lock(ctx context.Context, req *fuse.LockRequest) error { + log.Printf("dbg/lock %s -- %#v", h.f.Name(), req.Lock) return syscall.FcntlFlock(h.f.Fd(), F_OFD_SETLK, &syscall.Flock_t{ Type: int16(req.Lock.Type), Whence: io.SeekStart, @@ -112,6 +94,32 @@ func (h *Handle) lock(ctx context.Context, req *fuse.LockRequest) error { }) } +// LockWait acquires a lock on a byte range of the node, waiting +// until the lock can be obtained (or context is canceled). +func (h *Handle) LockWait(ctx context.Context, req *fuse.LockWaitRequest) error { + log.Printf("dbg/lockwait %s -- %#v", h.f.Name(), req.Lock) + return syscall.FcntlFlock(h.f.Fd(), F_OFD_SETLKW, &syscall.Flock_t{ + Type: int16(req.Lock.Type), + Whence: io.SeekStart, + Start: int64(req.Lock.Start), + Len: int64(req.Lock.End) - int64(req.Lock.Start), + }) +} + +// Unlock releases the lock on a byte range of the node. Locks can +// be released also implicitly, see HandleFlockLocker and +// HandlePOSIXLocker. +func (h *Handle) Unlock(ctx context.Context, req *fuse.UnlockRequest) error { + log.Printf("dbg/unlock %s -- %#v", h.f.Name(), req.Lock) + return syscall.FcntlFlock(h.f.Fd(), F_OFD_SETLK, &syscall.Flock_t{ + Type: int16(req.Lock.Type), + Whence: io.SeekStart, + Start: int64(req.Lock.Start), + Len: int64(req.Lock.End) - int64(req.Lock.Start), + }) + +} + // QueryLock returns the current state of locks held for the byte // range of the node. // @@ -121,17 +129,21 @@ func (h *Handle) lock(ctx context.Context, req *fuse.LockRequest) error { // have Lock.Type F_UNLCK, and the whole struct should be // overwritten for in case of conflicting locks. func (h *Handle) QueryLock(ctx context.Context, req *fuse.QueryLockRequest, resp *fuse.QueryLockResponse) error { - // type QueryLockRequest struct { - // Header - // Handle HandleID - // LockOwner LockOwner - // Lock FileLock - // LockFlags LockFlags - // } + flock_t := syscall.Flock_t{ + Type: int16(req.Lock.Type), + Whence: io.SeekStart, + Start: int64(req.Lock.Start), + Len: int64(req.Lock.End) - int64(req.Lock.Start), + } + if err := syscall.FcntlFlock(h.f.Fd(), F_OFD_GETLK, &flock_t); err != nil { + return err + } - // type QueryLockResponse struct { - // Lock FileLock - // } - - panic("TODO") + resp.Lock = fuse.FileLock{ + Type: fuse.LockType(flock_t.Type), + Start: uint64(flock_t.Start), + End: uint64(flock_t.Start + flock_t.Len), + PID: flock_t.Pid, + } + return nil } diff --git a/litestream.go b/litestream.go index da44fc9..9cfb3ef 100644 --- a/litestream.go +++ b/litestream.go @@ -2,7 +2,9 @@ package litestream import ( "encoding/binary" + "encoding/hex" "io" + "strings" ) // Magic number specified at the beginning of WAL files. @@ -40,3 +42,38 @@ func Checksum(bo binary.ByteOrder, s uint64, b []byte) (_ uint64, err error) { } return uint64(s0)<<32 | uint64(s1), nil } + +// HexDump returns hexdump output but with duplicate lines removed. +func HexDump(b []byte) string { + const prefixN = len("00000000") + + var output []string + var prev string + var ellipsis bool + + lines := strings.Split(strings.TrimSpace(hex.Dump(b)), "\n") + for i, line := range lines { + // Add line to output if it is not repeating or the last line. + if i == 0 || i == len(lines)-1 || trimPrefixN(line, prefixN) != trimPrefixN(prev, prefixN) { + output = append(output, line) + prev, ellipsis = line, false + continue + } + + // Add an ellipsis for the first duplicate line. + if !ellipsis { + output = append(output, "...") + ellipsis = true + continue + } + } + + return strings.Join(output, "\n") +} + +func trimPrefixN(s string, n int) string { + if len(s) < n { + return "" + } + return s[n:] +}