Fixing locking; unlock still not working

This commit is contained in:
Ben Johnson
2020-11-03 16:35:58 -07:00
parent b1ec5c721b
commit e52d3be78d
4 changed files with 88 additions and 34 deletions

View File

@@ -78,7 +78,7 @@ func (m *Main) Run(args []string) (err error) {
} }
// Mount FUSE filesystem. // Mount FUSE filesystem.
conn, err := fuse.Mount(m.Path, fuse.FSName("litestream"), fuse.Subtype("litestreamfs")) conn, err := fuse.Mount(m.Path, fuse.LockingPOSIX())
if err != nil { if err != nil {
return err return err
} }

7
db.go
View File

@@ -2,6 +2,7 @@ package litestream
import ( import (
"context" "context"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
@@ -55,7 +56,11 @@ func (db *DB) LogPath() string {
// Open loads the configuration file // Open loads the configuration file
func (db *DB) Open() error { func (db *DB) Open() error {
// TODO: Ensure sidecar directory structure exists. // Ensure meta directory exists.
if err := os.MkdirAll(db.MetaPath(), 0600); err != nil {
return err
}
return nil return nil
} }

View File

@@ -2,7 +2,6 @@ package litestream
import ( import (
"context" "context"
"encoding/hex"
"io" "io"
"log" "log"
"os" "os"
@@ -54,7 +53,7 @@ func (h *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Rea
// Write writes data at a given offset to the underlying file. // Write writes data at a given offset to the underlying file.
func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) { func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) {
log.Printf("write: name=%s offset=%d n=%d", h.f.Name(), req.Offset, len(req.Data)) log.Printf("write: name=%s offset=%d n=%d", h.f.Name(), req.Offset, len(req.Data))
println(hex.Dump(req.Data)) println(HexDump(req.Data))
resp.Size, err = h.f.WriteAt(req.Data, req.Offset) resp.Size, err = h.f.WriteAt(req.Data, req.Offset)
return err return err
@@ -86,24 +85,7 @@ func (h *Handle) ReadDirAll(ctx context.Context) (ents []fuse.Dirent, err error)
// Lock tries to acquire a lock on a byte range of the node. If a // Lock tries to acquire a lock on a byte range of the node. If a
// conflicting lock is already held, returns syscall.EAGAIN. // conflicting lock is already held, returns syscall.EAGAIN.
func (h *Handle) Lock(ctx context.Context, req *fuse.LockRequest) error { func (h *Handle) Lock(ctx context.Context, req *fuse.LockRequest) error {
return h.lock(ctx, req) log.Printf("dbg/lock %s -- %#v", h.f.Name(), req.Lock)
}
// LockWait acquires a lock on a byte range of the node, waiting
// until the lock can be obtained (or context is canceled).
func (h *Handle) LockWait(ctx context.Context, req *fuse.LockWaitRequest) error {
return h.lock(ctx, (*fuse.LockRequest)(req))
}
// Unlock releases the lock on a byte range of the node. Locks can
// be released also implicitly, see HandleFlockLocker and
// HandlePOSIXLocker.
func (h *Handle) Unlock(ctx context.Context, req *fuse.UnlockRequest) error {
return h.lock(ctx, (*fuse.LockRequest)(req))
}
func (h *Handle) lock(ctx context.Context, req *fuse.LockRequest) error {
return syscall.FcntlFlock(h.f.Fd(), F_OFD_SETLK, &syscall.Flock_t{ return syscall.FcntlFlock(h.f.Fd(), F_OFD_SETLK, &syscall.Flock_t{
Type: int16(req.Lock.Type), Type: int16(req.Lock.Type),
Whence: io.SeekStart, Whence: io.SeekStart,
@@ -112,6 +94,32 @@ func (h *Handle) lock(ctx context.Context, req *fuse.LockRequest) error {
}) })
} }
// LockWait acquires a lock on a byte range of the node, waiting
// until the lock can be obtained (or context is canceled).
func (h *Handle) LockWait(ctx context.Context, req *fuse.LockWaitRequest) error {
log.Printf("dbg/lockwait %s -- %#v", h.f.Name(), req.Lock)
return syscall.FcntlFlock(h.f.Fd(), F_OFD_SETLKW, &syscall.Flock_t{
Type: int16(req.Lock.Type),
Whence: io.SeekStart,
Start: int64(req.Lock.Start),
Len: int64(req.Lock.End) - int64(req.Lock.Start),
})
}
// Unlock releases the lock on a byte range of the node. Locks can
// be released also implicitly, see HandleFlockLocker and
// HandlePOSIXLocker.
func (h *Handle) Unlock(ctx context.Context, req *fuse.UnlockRequest) error {
log.Printf("dbg/unlock %s -- %#v", h.f.Name(), req.Lock)
return syscall.FcntlFlock(h.f.Fd(), F_OFD_SETLK, &syscall.Flock_t{
Type: int16(req.Lock.Type),
Whence: io.SeekStart,
Start: int64(req.Lock.Start),
Len: int64(req.Lock.End) - int64(req.Lock.Start),
})
}
// QueryLock returns the current state of locks held for the byte // QueryLock returns the current state of locks held for the byte
// range of the node. // range of the node.
// //
@@ -121,17 +129,21 @@ func (h *Handle) lock(ctx context.Context, req *fuse.LockRequest) error {
// have Lock.Type F_UNLCK, and the whole struct should be // have Lock.Type F_UNLCK, and the whole struct should be
// overwritten for in case of conflicting locks. // overwritten for in case of conflicting locks.
func (h *Handle) QueryLock(ctx context.Context, req *fuse.QueryLockRequest, resp *fuse.QueryLockResponse) error { func (h *Handle) QueryLock(ctx context.Context, req *fuse.QueryLockRequest, resp *fuse.QueryLockResponse) error {
// type QueryLockRequest struct { flock_t := syscall.Flock_t{
// Header Type: int16(req.Lock.Type),
// Handle HandleID Whence: io.SeekStart,
// LockOwner LockOwner Start: int64(req.Lock.Start),
// Lock FileLock Len: int64(req.Lock.End) - int64(req.Lock.Start),
// LockFlags LockFlags }
// } if err := syscall.FcntlFlock(h.f.Fd(), F_OFD_GETLK, &flock_t); err != nil {
return err
}
// type QueryLockResponse struct { resp.Lock = fuse.FileLock{
// Lock FileLock Type: fuse.LockType(flock_t.Type),
// } Start: uint64(flock_t.Start),
End: uint64(flock_t.Start + flock_t.Len),
panic("TODO") PID: flock_t.Pid,
}
return nil
} }

View File

@@ -2,7 +2,9 @@ package litestream
import ( import (
"encoding/binary" "encoding/binary"
"encoding/hex"
"io" "io"
"strings"
) )
// Magic number specified at the beginning of WAL files. // Magic number specified at the beginning of WAL files.
@@ -40,3 +42,38 @@ func Checksum(bo binary.ByteOrder, s uint64, b []byte) (_ uint64, err error) {
} }
return uint64(s0)<<32 | uint64(s1), nil return uint64(s0)<<32 | uint64(s1), nil
} }
// HexDump returns hexdump output but with duplicate lines removed.
func HexDump(b []byte) string {
const prefixN = len("00000000")
var output []string
var prev string
var ellipsis bool
lines := strings.Split(strings.TrimSpace(hex.Dump(b)), "\n")
for i, line := range lines {
// Add line to output if it is not repeating or the last line.
if i == 0 || i == len(lines)-1 || trimPrefixN(line, prefixN) != trimPrefixN(prev, prefixN) {
output = append(output, line)
prev, ellipsis = line, false
continue
}
// Add an ellipsis for the first duplicate line.
if !ellipsis {
output = append(output, "...")
ellipsis = true
continue
}
}
return strings.Join(output, "\n")
}
func trimPrefixN(s string, n int) string {
if len(s) < n {
return ""
}
return s[n:]
}