Fix release of non-OFD locks
This commit removes short-lived `os.Open()` calls on the database file because this can cause locks to be released when `os.File.Close()` is later called if the operating system does not support OFD (Open File Descriptor) locks.
This commit is contained in:
103
replica.go
103
replica.go
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash/crc64"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
@@ -31,10 +32,10 @@ type Replica interface {
|
||||
DB() *DB
|
||||
|
||||
// Starts replicating in a background goroutine.
|
||||
Start(ctx context.Context)
|
||||
Start(ctx context.Context) error
|
||||
|
||||
// Stops all replication processing. Blocks until processing stopped.
|
||||
Stop()
|
||||
Stop(hard bool) error
|
||||
|
||||
// Returns the last replication position.
|
||||
LastPos() Pos
|
||||
@@ -90,6 +91,9 @@ type FileReplica struct {
|
||||
mu sync.RWMutex
|
||||
pos Pos // last position
|
||||
|
||||
muf sync.Mutex
|
||||
f *os.File // long-running file descriptor to avoid non-OFD lock issues
|
||||
|
||||
wg sync.WaitGroup
|
||||
cancel func()
|
||||
|
||||
@@ -392,14 +396,14 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
|
||||
}
|
||||
|
||||
// Start starts replication for a given generation.
|
||||
func (r *FileReplica) Start(ctx context.Context) {
|
||||
func (r *FileReplica) Start(ctx context.Context) (err error) {
|
||||
// Ignore if replica is being used sychronously.
|
||||
if !r.MonitorEnabled {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop previous replication.
|
||||
r.Stop()
|
||||
r.Stop(false)
|
||||
|
||||
// Wrap context with cancelation.
|
||||
ctx, r.cancel = context.WithCancel(ctx)
|
||||
@@ -410,12 +414,27 @@ func (r *FileReplica) Start(ctx context.Context) {
|
||||
go func() { defer r.wg.Done(); r.retainer(ctx) }()
|
||||
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
|
||||
go func() { defer r.wg.Done(); r.validator(ctx) }()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop cancels any outstanding replication and blocks until finished.
|
||||
func (r *FileReplica) Stop() {
|
||||
//
|
||||
// Performing a hard stop will close the DB file descriptor which could release
|
||||
// locks on per-process locks. Hard stops should only be performed when
|
||||
// stopping the entire process.
|
||||
func (r *FileReplica) Stop(hard bool) (err error) {
|
||||
r.cancel()
|
||||
r.wg.Wait()
|
||||
|
||||
r.muf.Lock()
|
||||
defer r.muf.Unlock()
|
||||
if hard && r.f != nil {
|
||||
if e := r.f.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// monitor runs in a separate goroutine and continuously replicates the DB.
|
||||
@@ -582,6 +601,9 @@ func (r *FileReplica) Snapshot(ctx context.Context) error {
|
||||
|
||||
// snapshot copies the entire database to the replica path.
|
||||
func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error {
|
||||
r.muf.Lock()
|
||||
defer r.muf.Unlock()
|
||||
|
||||
// Acquire a read lock on the database during snapshot to prevent checkpoints.
|
||||
tx, err := r.db.db.Begin()
|
||||
if err != nil {
|
||||
@@ -602,7 +624,46 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
|
||||
|
||||
if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
|
||||
return err
|
||||
} else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil {
|
||||
}
|
||||
|
||||
// Open db file descriptor, if not already open.
|
||||
if r.f == nil {
|
||||
if r.f, err = os.Open(r.db.Path()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := r.f.Seek(0, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fi, err := r.f.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w, err := createFile(snapshotPath+".tmp", fi.Mode(), r.db.uid, r.db.gid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
zr := lz4.NewWriter(w)
|
||||
defer zr.Close()
|
||||
|
||||
// Copy & compress file contents to temporary file.
|
||||
if _, err := io.Copy(zr, r.f); err != nil {
|
||||
return err
|
||||
} else if err := zr.Close(); err != nil {
|
||||
return err
|
||||
} else if err := w.Sync(); err != nil {
|
||||
return err
|
||||
} else if err := w.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Move compressed file to final location.
|
||||
if err := os.Rename(snapshotPath+".tmp", snapshotPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -805,7 +866,7 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error {
|
||||
}
|
||||
|
||||
dst := filename + ".lz4"
|
||||
if err := compressFile(filename, dst, r.db.uid, r.db.gid); err != nil {
|
||||
if err := compressWALFile(filename, dst, r.db.uid, r.db.gid); err != nil {
|
||||
return err
|
||||
} else if err := os.Remove(filename); err != nil {
|
||||
return err
|
||||
@@ -1051,8 +1112,9 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int,
|
||||
return index, nil
|
||||
}
|
||||
|
||||
// compressFile compresses a file and replaces it with a new file with a .lz4 extension.
|
||||
func compressFile(src, dst string, uid, gid int) error {
|
||||
// compressWALFile compresses a file and replaces it with a new file with a .lz4 extension.
|
||||
// Do not use this on database files because of issues with non-OFD locks.
|
||||
func compressWALFile(src, dst string, uid, gid int) error {
|
||||
r, err := os.Open(src)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1102,7 +1164,6 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
|
||||
// Compute checksum of primary database under lock. This prevents a
|
||||
// sync from occurring and the database will not be written.
|
||||
primaryPath := filepath.Join(tmpdir, "primary")
|
||||
chksum0, pos, err := db.CRC64()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||
@@ -1125,10 +1186,19 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
}
|
||||
|
||||
// Open file handle for restored database.
|
||||
chksum1, err := checksumFile(restorePath)
|
||||
// NOTE: This open is ok as the restored database is not managed by litestream.
|
||||
f, err := os.Open(restorePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Read entire file into checksum.
|
||||
h := crc64.New(crc64.MakeTable(crc64.ISO))
|
||||
if _, err := io.Copy(h, f); err != nil {
|
||||
return err
|
||||
}
|
||||
chksum1 := h.Sum64()
|
||||
|
||||
status := "ok"
|
||||
mismatch := chksum0 != chksum1
|
||||
@@ -1140,15 +1210,6 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
// Validate checksums match.
|
||||
if mismatch {
|
||||
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
|
||||
|
||||
// Compress mismatched databases and report temporary path for investigation.
|
||||
if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil {
|
||||
return fmt.Errorf("cannot compress primary db: %w", err)
|
||||
} else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil {
|
||||
return fmt.Errorf("cannot compress replica db: %w", err)
|
||||
}
|
||||
log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir)
|
||||
|
||||
return ErrChecksumMismatch
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user