diff --git a/go.mod b/go.mod index 3f5b3d9..58f7c80 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go/storage v1.20.0 github.com/Azure/azure-storage-blob-go v0.14.0 github.com/aws/aws-sdk-go v1.42.53 + github.com/fsnotify/fsnotify v1.5.1 github.com/mattn/go-shellwords v1.0.12 github.com/mattn/go-sqlite3 v1.14.12 github.com/pierrec/lz4/v4 v4.1.14 diff --git a/go.sum b/go.sum index 895540f..a4d1389 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -236,8 +238,6 @@ github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqf github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk= github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= -github.com/mattn/go-sqlite3 v1.14.11 h1:gt+cp9c0XGqe9S/wAHTL3n/7MqY+siPWgWJgqdsFrzQ= -github.com/mattn/go-sqlite3 v1.14.11/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= diff --git a/internal/file_watcher.go b/internal/file_watcher.go deleted file mode 100644 index 501703d..0000000 --- a/internal/file_watcher.go +++ /dev/null @@ -1,36 +0,0 @@ -package internal - -import ( - "errors" -) - -// File event mask constants. -const ( - FileEventCreated = 1 << iota - FileEventModified - FileEventDeleted -) - -// FileEvent represents an event on a watched file. -type FileEvent struct { - Name string - Mask int -} - -// ErrFileEventQueueOverflow is returned when the file event queue has overflowed. -var ErrFileEventQueueOverflow = errors.New("file event queue overflow") - -// FileWatcher represents a watcher of file events. -type FileWatcher interface { - Open() error - Close() error - - // Returns a channel of events for watched files. - Events() <-chan FileEvent - - // Adds a specific file to be watched. - Watch(filename string) error - - // Removes a specific file from being watched. - Unwatch(filename string) error -} diff --git a/internal/file_watcher_bsd.go b/internal/file_watcher_bsd.go deleted file mode 100644 index c4852e0..0000000 --- a/internal/file_watcher_bsd.go +++ /dev/null @@ -1,259 +0,0 @@ -//go:build freebsd || openbsd || netbsd || dragonfly || darwin - -package internal - -import ( - "context" - "log" - "os" - "path/filepath" - "sync" - "time" - - "golang.org/x/sync/errgroup" - "golang.org/x/sys/unix" -) - -var _ FileWatcher = (*KqueueFileWatcher)(nil) - -// KqueueFileWatcher watches files and is notified of events on them. -// -// Watcher code based on https://github.com/fsnotify/fsnotify -type KqueueFileWatcher struct { - fd int - events chan FileEvent - - mu sync.Mutex - watches map[string]int - paths map[int]string - notExists map[string]struct{} - - g errgroup.Group - ctx context.Context - cancel func() -} - -// NewKqueueFileWatcher returns a new instance of KqueueFileWatcher. -func NewKqueueFileWatcher() *KqueueFileWatcher { - return &KqueueFileWatcher{ - events: make(chan FileEvent), - - watches: make(map[string]int), - paths: make(map[int]string), - notExists: make(map[string]struct{}), - } -} - -// NewFileWatcher returns an instance of KqueueFileWatcher on BSD systems. -func NewFileWatcher() FileWatcher { - return NewKqueueFileWatcher() -} - -// Events returns a read-only channel of file events. -func (w *KqueueFileWatcher) Events() <-chan FileEvent { - return w.events -} - -// Open initializes the watcher and begins listening for file events. -func (w *KqueueFileWatcher) Open() (err error) { - if w.fd, err = unix.Kqueue(); err != nil { - return err - } - - w.ctx, w.cancel = context.WithCancel(context.Background()) - w.g.Go(func() error { - if err := w.monitor(w.ctx); err != nil && w.ctx.Err() == nil { - return err - } - return nil - }) - w.g.Go(func() error { - if err := w.monitorNotExists(w.ctx); err != nil && w.ctx.Err() == nil { - return err - } - return nil - }) - - return nil -} - -// Close stops watching for file events and cleans up resources. -func (w *KqueueFileWatcher) Close() (err error) { - w.cancel() - - if w.fd != 0 { - if e := unix.Close(w.fd); e != nil && err == nil { - err = e - } - } - - if e := w.g.Wait(); e != nil && err == nil { - err = e - } - return err -} - -// Watch begins watching the given file or directory. -func (w *KqueueFileWatcher) Watch(filename string) error { - w.mu.Lock() - defer w.mu.Unlock() - - filename = filepath.Clean(filename) - - // If file doesn't exist, monitor separately until it does exist as we - // can't watch non-existent files with kqueue. - if _, err := os.Stat(filename); os.IsNotExist(err) { - w.notExists[filename] = struct{}{} - return nil - } - - return w.addWatch(filename) -} - -func (w *KqueueFileWatcher) addWatch(filename string) error { - wd, err := unix.Open(filename, unix.O_NONBLOCK|unix.O_RDONLY|unix.O_CLOEXEC, 0700) - if err != nil { - return err - } - - // TODO: Handle return count different than 1. - kevent := unix.Kevent_t{Fflags: unix.NOTE_DELETE | unix.NOTE_WRITE} - unix.SetKevent(&kevent, wd, unix.EVFILT_VNODE, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE) - if _, err := unix.Kevent(w.fd, []unix.Kevent_t{kevent}, nil, nil); err != nil { - return err - } - - w.watches[filename] = wd - w.paths[wd] = filename - - delete(w.notExists, filename) - - return err -} - -// Unwatch stops watching the given file or directory. -func (w *KqueueFileWatcher) Unwatch(filename string) error { - w.mu.Lock() - defer w.mu.Unlock() - - filename = filepath.Clean(filename) - - // Look up watch ID by filename. - wd, ok := w.watches[filename] - if !ok { - return nil - } - - // TODO: Handle return count different than 1. - var kevent unix.Kevent_t - unix.SetKevent(&kevent, wd, unix.EVFILT_VNODE, unix.EV_DELETE) - if _, err := unix.Kevent(w.fd, []unix.Kevent_t{kevent}, nil, nil); err != nil { - return err - } - unix.Close(wd) - - delete(w.paths, wd) - delete(w.watches, filename) - delete(w.notExists, filename) - - return nil -} - -// monitorNotExist runs in a separate goroutine and monitors for the creation of -// watched files that do not yet exist. -func (w *KqueueFileWatcher) monitorNotExists(ctx context.Context) error { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - w.checkNotExists(ctx) - } - } -} - -func (w *KqueueFileWatcher) checkNotExists(ctx context.Context) { - w.mu.Lock() - defer w.mu.Unlock() - - for filename := range w.notExists { - if _, err := os.Stat(filename); os.IsNotExist(err) { - continue - } - - if err := w.addWatch(filename); err != nil { - log.Printf("non-existent file monitor: cannot add watch: %s", err) - continue - } - - // Send event to channel. - select { - case w.events <- FileEvent{ - Name: filename, - Mask: FileEventCreated, - }: - default: - } - } -} - -// monitor runs in a separate goroutine and monitors the inotify event queue. -func (w *KqueueFileWatcher) monitor(ctx context.Context) error { - kevents := make([]unix.Kevent_t, 10) - timeout := unix.NsecToTimespec(int64(100 * time.Millisecond)) - - for { - n, err := unix.Kevent(w.fd, nil, kevents, &timeout) - if err != nil && err != unix.EINTR { - return err - } else if n < 0 { - continue - } - - for _, kevent := range kevents[:n] { - if err := w.recv(ctx, &kevent); err != nil { - return err - } - } - } -} - -// recv processes a single event from kqeueue. -func (w *KqueueFileWatcher) recv(ctx context.Context, kevent *unix.Kevent_t) error { - if err := ctx.Err(); err != nil { - return err - } - - // Look up filename & remove from watcher if this is a delete. - w.mu.Lock() - filename, ok := w.paths[int(kevent.Ident)] - if ok && kevent.Fflags&unix.NOTE_DELETE != 0 { - delete(w.paths, int(kevent.Ident)) - delete(w.watches, filename) - unix.Close(int(kevent.Ident)) - } - w.mu.Unlock() - - // Convert to generic file event mask. - var mask int - if kevent.Fflags&unix.NOTE_WRITE != 0 { - mask |= FileEventModified - } - if kevent.Fflags&unix.NOTE_DELETE != 0 { - mask |= FileEventDeleted - } - - // Send event to channel or wait for close. - select { - case <-ctx.Done(): - return ctx.Err() - case w.events <- FileEvent{ - Name: filename, - Mask: mask, - }: - return nil - } -} diff --git a/internal/file_watcher_linux.go b/internal/file_watcher_linux.go deleted file mode 100644 index 0735875..0000000 --- a/internal/file_watcher_linux.go +++ /dev/null @@ -1,369 +0,0 @@ -//go:build linux - -package internal - -import ( - "context" - "fmt" - "log" - "os" - "path/filepath" - "sync" - "time" - "unsafe" - - "golang.org/x/sync/errgroup" - "golang.org/x/sys/unix" -) - -var _ FileWatcher = (*InotifyFileWatcher)(nil) - -// InotifyFileWatcher watches files and is notified of events on them. -// -// Watcher code based on https://github.com/fsnotify/fsnotify -type InotifyFileWatcher struct { - inotify struct { - fd int - buf []byte - } - epoll struct { - fd int // epoll_create1() file descriptor - events []unix.EpollEvent - } - pipe struct { - r int // read pipe file descriptor - w int // write pipe file descriptor - } - - events chan FileEvent - - mu sync.Mutex - watches map[string]int - paths map[int]string - notExists map[string]struct{} - - g errgroup.Group - ctx context.Context - cancel func() -} - -// NewInotifyFileWatcher returns a new instance of InotifyFileWatcher. -func NewInotifyFileWatcher() *InotifyFileWatcher { - w := &InotifyFileWatcher{ - events: make(chan FileEvent), - - watches: make(map[string]int), - paths: make(map[int]string), - notExists: make(map[string]struct{}), - } - - w.inotify.buf = make([]byte, 4096*unix.SizeofInotifyEvent) - w.epoll.events = make([]unix.EpollEvent, 64) - - return w -} - -// NewFileWatcher returns an instance of InotifyFileWatcher on Linux systems. -func NewFileWatcher() FileWatcher { - return NewInotifyFileWatcher() -} - -// Events returns a read-only channel of file events. -func (w *InotifyFileWatcher) Events() <-chan FileEvent { - return w.events -} - -// Open initializes the watcher and begins listening for file events. -func (w *InotifyFileWatcher) Open() (err error) { - w.inotify.fd, err = unix.InotifyInit1(unix.IN_CLOEXEC) - if err != nil { - return fmt.Errorf("cannot init inotify: %w", err) - } - - // Initialize epoll and create a non-blocking pipe. - if w.epoll.fd, err = unix.EpollCreate1(unix.EPOLL_CLOEXEC); err != nil { - return fmt.Errorf("cannot create epoll: %w", err) - } - - pipe := []int{-1, -1} - if err := unix.Pipe2(pipe[:], unix.O_NONBLOCK|unix.O_CLOEXEC); err != nil { - return fmt.Errorf("cannot create epoll pipe: %w", err) - } - w.pipe.r, w.pipe.w = pipe[0], pipe[1] - - // Register inotify fd with epoll - if err := unix.EpollCtl(w.epoll.fd, unix.EPOLL_CTL_ADD, w.inotify.fd, &unix.EpollEvent{ - Fd: int32(w.inotify.fd), - Events: unix.EPOLLIN, - }); err != nil { - return fmt.Errorf("cannot add inotify to epoll: %w", err) - } - - // Register pipe fd with epoll - if err := unix.EpollCtl(w.epoll.fd, unix.EPOLL_CTL_ADD, w.pipe.r, &unix.EpollEvent{ - Fd: int32(w.pipe.r), - Events: unix.EPOLLIN, - }); err != nil { - return fmt.Errorf("cannot add pipe to epoll: %w", err) - } - - w.ctx, w.cancel = context.WithCancel(context.Background()) - w.g.Go(func() error { - if err := w.monitor(w.ctx); err != nil && w.ctx.Err() == nil { - return err - } - return nil - }) - w.g.Go(func() error { - if err := w.monitorNotExists(w.ctx); err != nil && w.ctx.Err() == nil { - return err - } - return nil - }) - - return nil -} - -// Close stops watching for file events and cleans up resources. -func (w *InotifyFileWatcher) Close() (err error) { - w.cancel() - - if e := w.wake(); e != nil && err == nil { - err = e - } - if e := w.g.Wait(); e != nil && err == nil { - err = e - } - return err -} - -// Watch begins watching the given file or directory. -func (w *InotifyFileWatcher) Watch(filename string) error { - w.mu.Lock() - defer w.mu.Unlock() - - filename = filepath.Clean(filename) - - // If file doesn't exist, monitor separately until it does exist as we - // can't watch non-existent files with inotify. - if _, err := os.Stat(filename); os.IsNotExist(err) { - w.notExists[filename] = struct{}{} - return nil - } - - return w.addWatch(filename) -} - -func (w *InotifyFileWatcher) addWatch(filename string) error { - wd, err := unix.InotifyAddWatch(w.inotify.fd, filename, unix.IN_MODIFY|unix.IN_DELETE_SELF) - if err != nil { - return err - } - - w.watches[filename] = wd - w.paths[wd] = filename - - delete(w.notExists, filename) - - return err -} - -// Unwatch stops watching the given file or directory. -func (w *InotifyFileWatcher) Unwatch(filename string) error { - w.mu.Lock() - defer w.mu.Unlock() - - filename = filepath.Clean(filename) - - // Look up watch ID by filename. - wd, ok := w.watches[filename] - if !ok { - return nil - } - - if _, err := unix.InotifyRmWatch(w.inotify.fd, uint32(wd)); err != nil { - return err - } - - delete(w.paths, wd) - delete(w.watches, filename) - delete(w.notExists, filename) - - return nil -} - -// monitorNotExist runs in a separate goroutine and monitors for the creation of -// watched files that do not yet exist. -func (w *InotifyFileWatcher) monitorNotExists(ctx context.Context) error { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - w.checkNotExists(ctx) - } - } -} - -func (w *InotifyFileWatcher) checkNotExists(ctx context.Context) { - w.mu.Lock() - defer w.mu.Unlock() - - for filename := range w.notExists { - if _, err := os.Stat(filename); os.IsNotExist(err) { - continue - } - - if err := w.addWatch(filename); err != nil { - log.Printf("non-existent file monitor: cannot add watch: %s", err) - continue - } - - // Send event to channel. - select { - case w.events <- FileEvent{ - Name: filename, - Mask: FileEventCreated, - }: - default: - } - } -} - -// monitor runs in a separate goroutine and monitors the inotify event queue. -func (w *InotifyFileWatcher) monitor(ctx context.Context) error { - // Close all file descriptors once monitor exits. - defer func() { - unix.Close(w.inotify.fd) - unix.Close(w.epoll.fd) - unix.Close(w.pipe.w) - unix.Close(w.pipe.r) - }() - - for { - if err := w.wait(ctx); err != nil { - return err - } else if err := w.read(ctx); err != nil { - return err - } - } -} - -// read reads from the inotify file descriptor. Automatically rety on EINTR. -func (w *InotifyFileWatcher) read(ctx context.Context) error { - for { - n, err := unix.Read(w.inotify.fd, w.inotify.buf) - if err != nil && err != unix.EINTR { - return err - } else if n < 0 { - continue - } - - return w.recv(ctx, w.inotify.buf[:n]) - } -} - -func (w *InotifyFileWatcher) recv(ctx context.Context, b []byte) error { - if err := ctx.Err(); err != nil { - return err - } - - for { - if len(b) == 0 { - return nil - } else if len(b) < unix.SizeofInotifyEvent { - return fmt.Errorf("InotifyFileWatcher.recv(): inotify short record: n=%d", len(b)) - } - - event := (*unix.InotifyEvent)(unsafe.Pointer(&b[0])) - if event.Mask&unix.IN_Q_OVERFLOW != 0 { - // TODO: Change to notify all watches. - return ErrFileEventQueueOverflow - } - - // Remove deleted files from the lookups. - w.mu.Lock() - name, ok := w.paths[int(event.Wd)] - if ok && event.Mask&unix.IN_DELETE_SELF != 0 { - delete(w.paths, int(event.Wd)) - delete(w.watches, name) - } - w.mu.Unlock() - - //if nameLen > 0 { - // // Point "bytes" at the first byte of the filename - // bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent]))[:nameLen:nameLen] - // // The filename is padded with NULL bytes. TrimRight() gets rid of those. - // name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000") - //} - - // Move to next event. - b = b[unix.SizeofInotifyEvent+event.Len:] - - // Skip event if ignored. - if event.Mask&unix.IN_IGNORED != 0 { - continue - } - - // Convert to generic file event mask. - var mask int - if event.Mask&unix.IN_MODIFY != 0 { - mask |= FileEventModified - } - if event.Mask&unix.IN_DELETE_SELF != 0 { - mask |= FileEventDeleted - } - - // Send event to channel or wait for close. - select { - case <-ctx.Done(): - return ctx.Err() - case w.events <- FileEvent{ - Name: name, - Mask: mask, - }: - } - } -} - -func (w *InotifyFileWatcher) wait(ctx context.Context) error { - for { - n, err := unix.EpollWait(w.epoll.fd, w.epoll.events, -1) - if n == 0 || err == unix.EINTR { - continue - } else if err != nil { - return err - } - - // Read events to see if we have data available on inotify or if we are awaken. - var hasData bool - for _, event := range w.epoll.events[:n] { - switch event.Fd { - case int32(w.inotify.fd): // inotify file descriptor - hasData = hasData || event.Events&(unix.EPOLLHUP|unix.EPOLLERR|unix.EPOLLIN) != 0 - - case int32(w.pipe.r): // epoll file descriptor - if _, err := unix.Read(w.pipe.r, make([]byte, 1024)); err != nil && err != unix.EAGAIN { - return fmt.Errorf("epoll pipe error: %w", err) - } - } - } - - // Check if context is closed and then exit if data is available. - if err := ctx.Err(); err != nil { - return err - } else if hasData { - return nil - } - } -} - -func (w *InotifyFileWatcher) wake() error { - if _, err := unix.Write(w.pipe.w, []byte{0}); err != nil && err != unix.EAGAIN { - return err - } - return nil -} diff --git a/internal/file_watcher_test.go b/internal/file_watcher_test.go deleted file mode 100644 index dd76715..0000000 --- a/internal/file_watcher_test.go +++ /dev/null @@ -1,211 +0,0 @@ -package internal_test - -import ( - "database/sql" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/benbjohnson/litestream/internal" - _ "github.com/mattn/go-sqlite3" -) - -func TestFileWatcher(t *testing.T) { - t.Run("WriteAndRemove", func(t *testing.T) { - dbPath := filepath.Join(t.TempDir(), "db") - - w := internal.NewFileWatcher() - if err := w.Open(); err != nil { - t.Fatal(err) - } - defer w.Close() - - db, err := sql.Open("sqlite3", dbPath) - if err != nil { - t.Fatal(err) - } - defer db.Close() - - if _, err := db.Exec(`PRAGMA journal_mode = wal`); err != nil { - t.Fatal(err) - } else if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil { - t.Fatal(err) - } - - if err := w.Watch(dbPath + "-wal"); err != nil { - t.Fatal(err) - } - - // Write to the WAL file & ensure a "modified" event occurs. - if _, err := db.Exec(`INSERT INTO t (x) VALUES (1)`); err != nil { - t.Fatal(err) - } - - select { - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for event") - case event := <-w.Events(): - if got, want := event.Name, dbPath+"-wal"; got != want { - t.Fatalf("name=%s, want %s", got, want) - } else if got, want := event.Mask, internal.FileEventModified; got != want { - t.Fatalf("mask=0x%02x, want 0x%02x", got, want) - } - } - - // Flush any duplicate events. - drainFileEventChannel(w.Events()) - - // Close database and ensure checkpointed WAL creates a "delete" event. - if err := db.Close(); err != nil { - t.Fatal(err) - } - - select { - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for event") - case event := <-w.Events(): - if got, want := event.Name, dbPath+"-wal"; got != want { - t.Fatalf("name=%s, want %s", got, want) - } else if got, want := event.Mask, internal.FileEventDeleted; got != want { - t.Fatalf("mask=0x%02x, want 0x%02x", got, want) - } - } - }) - - t.Run("LargeTx", func(t *testing.T) { - w := internal.NewFileWatcher() - if err := w.Open(); err != nil { - t.Fatal(err) - } - defer w.Close() - - dbPath := filepath.Join(t.TempDir(), "db") - db, err := sql.Open("sqlite3", dbPath) - if err != nil { - t.Fatal(err) - } else if _, err := db.Exec(`PRAGMA cache_size = 4`); err != nil { - t.Fatal(err) - } else if _, err := db.Exec(`PRAGMA journal_mode = wal`); err != nil { - t.Fatal(err) - } else if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil { - t.Fatal(err) - } - defer db.Close() - - if err := w.Watch(dbPath + "-wal"); err != nil { - t.Fatal(err) - } - - // Start a transaction to ensure writing large data creates multiple write events. - tx, err := db.Begin() - if err != nil { - t.Fatal(err) - } - defer func() { _ = tx.Rollback() }() - - // Write enough data to require a spill. - for i := 0; i < 100; i++ { - if _, err := tx.Exec(`INSERT INTO t (x) VALUES (?)`, strings.Repeat("x", 512)); err != nil { - t.Fatal(err) - } - } - - // Ensure spill writes to disk. - select { - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for event") - case event := <-w.Events(): - if got, want := event.Name, dbPath+"-wal"; got != want { - t.Fatalf("name=%s, want %s", got, want) - } else if got, want := event.Mask, internal.FileEventModified; got != want { - t.Fatalf("mask=0x%02x, want 0x%02x", got, want) - } - } - - // Flush any duplicate events. - drainFileEventChannel(w.Events()) - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } - - // Final commit should spill remaining pages and cause another write event. - select { - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for event") - case event := <-w.Events(): - if got, want := event.Name, dbPath+"-wal"; got != want { - t.Fatalf("name=%s, want %s", got, want) - } else if got, want := event.Mask, internal.FileEventModified; got != want { - t.Fatalf("mask=0x%02x, want 0x%02x", got, want) - } - } - }) - - t.Run("WatchBeforeCreate", func(t *testing.T) { - dbPath := filepath.Join(t.TempDir(), "db") - - w := internal.NewFileWatcher() - if err := w.Open(); err != nil { - t.Fatal(err) - } - defer w.Close() - - if err := w.Watch(dbPath); err != nil { - t.Fatal(err) - } else if err := w.Watch(dbPath + "-wal"); err != nil { - t.Fatal(err) - } - - db, err := sql.Open("sqlite3", dbPath) - if err != nil { - t.Fatal(err) - } - defer db.Close() - - if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil { - t.Fatal(err) - } - - // Wait for main database creation event. - waitForFileEvent(t, w.Events(), internal.FileEvent{Name: dbPath, Mask: internal.FileEventCreated}) - - // Write to the WAL file & ensure a "modified" event occurs. - if _, err := db.Exec(`PRAGMA journal_mode = wal`); err != nil { - t.Fatal(err) - } else if _, err := db.Exec(`INSERT INTO t (x) VALUES (1)`); err != nil { - t.Fatal(err) - } - - // Wait for WAL creation event. - waitForFileEvent(t, w.Events(), internal.FileEvent{Name: dbPath + "-wal", Mask: internal.FileEventCreated}) - }) -} - -func drainFileEventChannel(ch <-chan internal.FileEvent) { - for { - select { - case <-time.After(100 * time.Millisecond): - return - case <-ch: - } - } -} - -func waitForFileEvent(tb testing.TB, ch <-chan internal.FileEvent, want internal.FileEvent) { - tb.Helper() - - timeout := time.After(10 * time.Second) - - for { - select { - case <-timeout: - tb.Fatalf("timeout waiting for event: %#v", want) - case got := <-ch: - if got == want { - return - } - } - } -} diff --git a/internal/internal.go b/internal/internal.go index 95d0f78..e4da7f8 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -176,15 +176,6 @@ func MkdirAll(path string, mode os.FileMode, uid, gid int) error { return nil } -// Fileinfo returns syscall fields from a FileInfo object. -func Fileinfo(fi os.FileInfo) (uid, gid int) { - if fi == nil { - return -1, -1 - } - stat := fi.Sys().(*syscall.Stat_t) - return int(stat.Uid), int(stat.Gid) -} - // ParseSnapshotPath parses the index from a snapshot filename. Used by path-based replicas. func ParseSnapshotPath(s string) (index int, err error) { a := snapshotPathRegex.FindStringSubmatch(s) diff --git a/server.go b/server.go index 501e81e..f1fe7d9 100644 --- a/server.go +++ b/server.go @@ -3,10 +3,11 @@ package litestream import ( "context" "fmt" + "path/filepath" "strings" "sync" - "github.com/benbjohnson/litestream/internal" + "github.com/fsnotify/fsnotify" "golang.org/x/sync/errgroup" ) @@ -15,7 +16,7 @@ import ( type Server struct { mu sync.Mutex dbs map[string]*DB // databases by path - watcher internal.FileWatcher + watcher *fsnotify.Watcher ctx context.Context cancel func() @@ -31,8 +32,9 @@ func NewServer() *Server { // Open initializes the server and begins watching for file system events. func (s *Server) Open() error { - s.watcher = internal.NewFileWatcher() - if err := s.watcher.Open(); err != nil { + var err error + s.watcher, err = fsnotify.NewWatcher() + if err != nil { return err } @@ -110,10 +112,8 @@ func (s *Server) Watch(path string, fn func(path string) (*DB, error)) error { s.dbs[path] = db // Watch for changes on the database file & WAL. - if err := s.watcher.Watch(path); err != nil { + if err := s.watcher.Add(filepath.Dir(path)); err != nil { return fmt.Errorf("watch db file: %w", err) - } else if err := s.watcher.Watch(path + "-wal"); err != nil { - return fmt.Errorf("watch wal file: %w", err) } // Kick off an initial sync. @@ -137,7 +137,7 @@ func (s *Server) Unwatch(path string) error { delete(s.dbs, path) // Stop watching for changes on the database WAL. - if err := s.watcher.Unwatch(path + "-wal"); err != nil { + if err := s.watcher.Remove(path + "-wal"); err != nil { return fmt.Errorf("unwatch file: %w", err) } @@ -149,13 +149,26 @@ func (s *Server) Unwatch(path string) error { return nil } +func (s *Server) isWatched(event fsnotify.Event) bool { + path := event.Name + path = strings.TrimSuffix(path, "-wal") + + if _, ok := s.dbs[path]; ok { + return true + } + return false +} + // monitor runs in a separate goroutine and dispatches notifications to managed DBs. func (s *Server) monitor(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() - case event := <-s.watcher.Events(): + case event := <-s.watcher.Events: + if !s.isWatched(event) { + continue + } if err := s.dispatchFileEvent(ctx, event); err != nil { return err } @@ -164,7 +177,7 @@ func (s *Server) monitor(ctx context.Context) error { } // dispatchFileEvent dispatches a notification to the database which owns the file. -func (s *Server) dispatchFileEvent(ctx context.Context, event internal.FileEvent) error { +func (s *Server) dispatchFileEvent(ctx context.Context, event fsnotify.Event) error { path := event.Name path = strings.TrimSuffix(path, "-wal")