From a14a74d678bb3d096cbd12dbb9b3edc620333eda Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 27 Feb 2021 09:39:55 -0700 Subject: [PATCH] Fix release of non-OFD locks This commit removes short-lived `os.Open()` calls on the database file because this can cause locks to be released when `os.File.Close()` is later called if the operating system does not support OFD (Open File Descriptor) locks. --- db.go | 73 ++++++++++++++--------------------- litestream.go | 5 ++- replica.go | 103 ++++++++++++++++++++++++++++++++++++++++---------- s3/s3.go | 51 +++++++++++++++++++------ 4 files changed, 153 insertions(+), 79 deletions(-) diff --git a/db.go b/db.go index 7e6d36b..1cf4a16 100644 --- a/db.go +++ b/db.go @@ -45,6 +45,7 @@ type DB struct { mu sync.RWMutex path string // part to database db *sql.DB // target database + f *os.File // long-running db file descriptor rtx *sql.Tx // long running read transaction pageSize int // page size, in bytes notify chan struct{} // closes on WAL change @@ -285,8 +286,15 @@ func (db *DB) Open() (err error) { // Close releases the read lock & closes the database. This method should only // be called by tests as it causes the underlying database to be checkpointed. func (db *DB) Close() (err error) { - if e := db.SoftClose(); e != nil && err == nil { - err = e + // Ensure replicas all stop replicating. + for _, r := range db.Replicas { + r.Stop(true) + } + + if db.rtx != nil { + if e := db.releaseReadLock(); e != nil && err == nil { + err = e + } } if db.db != nil { @@ -294,6 +302,7 @@ func (db *DB) Close() (err error) { err = e } } + return err } @@ -386,13 +395,19 @@ func (db *DB) init() (err error) { return err } + // Open long-running database file descriptor. Required for non-OFD locks. + if db.f, err = os.Open(db.path); err != nil { + return fmt.Errorf("open db file descriptor: %w", err) + } + // Ensure database is closed if init fails. // Initialization can retry on next sync. defer func() { if err != nil { - db.releaseReadLock() + _ = db.releaseReadLock() db.db.Close() - db.db = nil + db.f.Close() + db.db, db.f = nil, nil } }() @@ -586,7 +601,7 @@ func (db *DB) SoftClose() (err error) { // Ensure replicas all stop replicating. for _, r := range db.Replicas { - r.Stop() + r.Stop(false) } if db.rtx != nil { @@ -917,9 +932,9 @@ func (db *DB) verify() (info syncInfo, err error) { // Verify last page synced still matches. if info.shadowWALSize > WALHeaderSize { offset := info.shadowWALSize - int64(db.pageSize+WALFrameHeaderSize) - if buf0, err := readFileAt(db.WALPath(), offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { + if buf0, err := readWALFileAt(db.WALPath(), offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { return info, fmt.Errorf("cannot read last synced wal page: %w", err) - } else if buf1, err := readFileAt(info.shadowWALPath, offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { + } else if buf1, err := readWALFileAt(info.shadowWALPath, offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil { return info, fmt.Errorf("cannot read last synced shadow wal page: %w", err) } else if !bytes.Equal(buf0, buf1) { info.reason = "wal overwritten by another process" @@ -1466,20 +1481,6 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error { return nil } -func checksumFile(filename string) (uint64, error) { - f, err := os.Open(filename) - if err != nil { - return 0, err - } - defer f.Close() - - h := crc64.New(crc64.MakeTable(crc64.ISO)) - if _, err := io.Copy(h, f); err != nil { - return 0, err - } - return h.Sum64(), nil -} - // CalcRestoreTarget returns a replica & generation to restore from based on opt criteria. func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (Replica, string, error) { var target struct { @@ -1672,11 +1673,14 @@ func (db *DB) CRC64() (uint64, Pos, error) { } pos.Offset = 0 - chksum, err := checksumFile(db.Path()) - if err != nil { + // Seek to the beginning of the db file descriptor and checksum whole file. + h := crc64.New(crc64.MakeTable(crc64.ISO)) + if _, err := db.f.Seek(0, io.SeekStart); err != nil { + return 0, pos, err + } else if _, err := io.Copy(h, db.f); err != nil { return 0, pos, err } - return chksum, pos, nil + return h.Sum64(), pos, nil } // RestoreOptions represents options for DB.Restore(). @@ -1808,24 +1812,3 @@ func headerByteOrder(hdr []byte) (binary.ByteOrder, error) { return nil, fmt.Errorf("invalid wal header magic: %x", magic) } } - -func copyFile(dst, src string) error { - r, err := os.Open(src) - if err != nil { - return err - } - defer r.Close() - - w, err := os.Create(dst) - if err != nil { - return err - } - defer w.Close() - - if _, err := io.Copy(w, r); err != nil { - return err - } else if err := w.Sync(); err != nil { - return err - } - return nil -} diff --git a/litestream.go b/litestream.go index 22d4cd1..3b3952b 100644 --- a/litestream.go +++ b/litestream.go @@ -152,8 +152,9 @@ func readWALHeader(filename string) ([]byte, error) { return buf[:n], err } -// readFileAt reads a slice from a file. -func readFileAt(filename string, offset, n int64) ([]byte, error) { +// readWALFileAt reads a slice from a file. Do not use this with database files +// as it causes problems with non-OFD locks. +func readWALFileAt(filename string, offset, n int64) ([]byte, error) { f, err := os.Open(filename) if err != nil { return nil, err diff --git a/replica.go b/replica.go index 1023eec..d7369c5 100644 --- a/replica.go +++ b/replica.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "fmt" + "hash/crc64" "io" "io/ioutil" "log" @@ -31,10 +32,10 @@ type Replica interface { DB() *DB // Starts replicating in a background goroutine. - Start(ctx context.Context) + Start(ctx context.Context) error // Stops all replication processing. Blocks until processing stopped. - Stop() + Stop(hard bool) error // Returns the last replication position. LastPos() Pos @@ -90,6 +91,9 @@ type FileReplica struct { mu sync.RWMutex pos Pos // last position + muf sync.Mutex + f *os.File // long-running file descriptor to avoid non-OFD lock issues + wg sync.WaitGroup cancel func() @@ -392,14 +396,14 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) { } // Start starts replication for a given generation. -func (r *FileReplica) Start(ctx context.Context) { +func (r *FileReplica) Start(ctx context.Context) (err error) { // Ignore if replica is being used sychronously. if !r.MonitorEnabled { - return + return nil } // Stop previous replication. - r.Stop() + r.Stop(false) // Wrap context with cancelation. ctx, r.cancel = context.WithCancel(ctx) @@ -410,12 +414,27 @@ func (r *FileReplica) Start(ctx context.Context) { go func() { defer r.wg.Done(); r.retainer(ctx) }() go func() { defer r.wg.Done(); r.snapshotter(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }() + + return nil } // Stop cancels any outstanding replication and blocks until finished. -func (r *FileReplica) Stop() { +// +// Performing a hard stop will close the DB file descriptor which could release +// locks on per-process locks. Hard stops should only be performed when +// stopping the entire process. +func (r *FileReplica) Stop(hard bool) (err error) { r.cancel() r.wg.Wait() + + r.muf.Lock() + defer r.muf.Unlock() + if hard && r.f != nil { + if e := r.f.Close(); e != nil && err == nil { + err = e + } + } + return err } // monitor runs in a separate goroutine and continuously replicates the DB. @@ -582,6 +601,9 @@ func (r *FileReplica) Snapshot(ctx context.Context) error { // snapshot copies the entire database to the replica path. func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error { + r.muf.Lock() + defer r.muf.Unlock() + // Acquire a read lock on the database during snapshot to prevent checkpoints. tx, err := r.db.db.Begin() if err != nil { @@ -602,7 +624,46 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil { return err - } else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil { + } + + // Open db file descriptor, if not already open. + if r.f == nil { + if r.f, err = os.Open(r.db.Path()); err != nil { + return err + } + } + + if _, err := r.f.Seek(0, io.SeekStart); err != nil { + return err + } + + fi, err := r.f.Stat() + if err != nil { + return err + } + + w, err := createFile(snapshotPath+".tmp", fi.Mode(), r.db.uid, r.db.gid) + if err != nil { + return err + } + defer w.Close() + + zr := lz4.NewWriter(w) + defer zr.Close() + + // Copy & compress file contents to temporary file. + if _, err := io.Copy(zr, r.f); err != nil { + return err + } else if err := zr.Close(); err != nil { + return err + } else if err := w.Sync(); err != nil { + return err + } else if err := w.Close(); err != nil { + return err + } + + // Move compressed file to final location. + if err := os.Rename(snapshotPath+".tmp", snapshotPath); err != nil { return err } @@ -805,7 +866,7 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error { } dst := filename + ".lz4" - if err := compressFile(filename, dst, r.db.uid, r.db.gid); err != nil { + if err := compressWALFile(filename, dst, r.db.uid, r.db.gid); err != nil { return err } else if err := os.Remove(filename); err != nil { return err @@ -1051,8 +1112,9 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int, return index, nil } -// compressFile compresses a file and replaces it with a new file with a .lz4 extension. -func compressFile(src, dst string, uid, gid int) error { +// compressWALFile compresses a file and replaces it with a new file with a .lz4 extension. +// Do not use this on database files because of issues with non-OFD locks. +func compressWALFile(src, dst string, uid, gid int) error { r, err := os.Open(src) if err != nil { return err @@ -1102,7 +1164,6 @@ func ValidateReplica(ctx context.Context, r Replica) error { // Compute checksum of primary database under lock. This prevents a // sync from occurring and the database will not be written. - primaryPath := filepath.Join(tmpdir, "primary") chksum0, pos, err := db.CRC64() if err != nil { return fmt.Errorf("cannot compute checksum: %w", err) @@ -1125,10 +1186,19 @@ func ValidateReplica(ctx context.Context, r Replica) error { } // Open file handle for restored database. - chksum1, err := checksumFile(restorePath) + // NOTE: This open is ok as the restored database is not managed by litestream. + f, err := os.Open(restorePath) if err != nil { return err } + defer f.Close() + + // Read entire file into checksum. + h := crc64.New(crc64.MakeTable(crc64.ISO)) + if _, err := io.Copy(h, f); err != nil { + return err + } + chksum1 := h.Sum64() status := "ok" mismatch := chksum0 != chksum1 @@ -1140,15 +1210,6 @@ func ValidateReplica(ctx context.Context, r Replica) error { // Validate checksums match. if mismatch { internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc() - - // Compress mismatched databases and report temporary path for investigation. - if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil { - return fmt.Errorf("cannot compress primary db: %w", err) - } else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil { - return fmt.Errorf("cannot compress replica db: %w", err) - } - log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir) - return ErrChecksumMismatch } diff --git a/s3/s3.go b/s3/s3.go index d1e3b4d..b4424dd 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -52,6 +52,9 @@ type Replica struct { snapshotMu sync.Mutex pos litestream.Pos // last position + muf sync.Mutex + f *os.File // long-lived read-only db file descriptor + wg sync.WaitGroup cancel func() @@ -417,14 +420,14 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) { } // Start starts replication for a given generation. -func (r *Replica) Start(ctx context.Context) { +func (r *Replica) Start(ctx context.Context) (err error) { // Ignore if replica is being used sychronously. if !r.MonitorEnabled { - return + return nil } // Stop previous replication. - r.Stop() + r.Stop(false) // Wrap context with cancelation. ctx, r.cancel = context.WithCancel(ctx) @@ -435,12 +438,29 @@ func (r *Replica) Start(ctx context.Context) { go func() { defer r.wg.Done(); r.retainer(ctx) }() go func() { defer r.wg.Done(); r.snapshotter(ctx) }() go func() { defer r.wg.Done(); r.validator(ctx) }() + + return nil } // Stop cancels any outstanding replication and blocks until finished. -func (r *Replica) Stop() { +// +// Performing a hard stop will close the DB file descriptor which could release +// locks on per-process locks. Hard stops should only be performed when +// stopping the entire process. +func (r *Replica) Stop(hard bool) (err error) { r.cancel() r.wg.Wait() + + r.muf.Lock() + defer r.muf.Unlock() + + if hard && r.f != nil { + if e := r.f.Close(); e != nil && err == nil { + err = e + } + } + + return err } // monitor runs in a separate goroutine and continuously replicates the DB. @@ -623,6 +643,9 @@ func (r *Replica) Snapshot(ctx context.Context) error { // snapshot copies the entire database to the replica path. func (r *Replica) snapshot(ctx context.Context, generation string, index int) error { + r.muf.Lock() + defer r.muf.Unlock() + // Acquire a read lock on the database during snapshot to prevent checkpoints. tx, err := r.db.SQLDB().Begin() if err != nil { @@ -633,14 +656,21 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er } defer func() { _ = tx.Rollback() }() - // Open database file handle. - f, err := os.Open(r.db.Path()) - if err != nil { + // Open long-lived file descriptor on database. + if r.f == nil { + if r.f, err = os.Open(r.db.Path()); err != nil { + return err + } + } + + // Move the file descriptor to the beginning. We only use one long lived + // file descriptor because some operating systems will remove the database + // lock when closing a separate file descriptor on the DB. + if _, err := r.f.Seek(0, io.SeekStart); err != nil { return err } - defer f.Close() - fi, err := f.Stat() + fi, err := r.f.Stat() if err != nil { return err } @@ -648,7 +678,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er pr, pw := io.Pipe() zw := lz4.NewWriter(pw) go func() { - if _, err := io.Copy(zw, f); err != nil { + if _, err := io.Copy(zw, r.f); err != nil { _ = pw.CloseWithError(err) return } @@ -670,7 +700,6 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er r.putOperationBytesCounter.Add(float64(fi.Size())) log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) - return nil }