Compare commits

..

3 Commits

Author SHA1 Message Date
934e65a4e6 moved metapath 2024-10-12 11:18:04 +01:00
Ananth
5be467a478 Switch to S3 HEAD to find bucket location (#580) 2024-04-20 09:08:50 -06:00
rustfix
5e1c112468 chore: fix function name in comment (#579)
Signed-off-by: rustfix <771054535@qq.com>
2024-04-15 08:35:56 -04:00
4 changed files with 65 additions and 25 deletions

52
db.go
View File

@@ -145,6 +145,40 @@ func NewDB(path string) *DB {
return db return db
} }
// NewDB returns a new instance of DB for a given path.
func NewLSDB(path string, metapath string) *DB {
_, file := filepath.Split(path)
db := &DB{
path: path,
metaPath: filepath.Join(metapath, "."+file+MetaDirSuffix),
notify: make(chan struct{}),
MinCheckpointPageN: DefaultMinCheckpointPageN,
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
TruncatePageN: DefaultTruncatePageN,
CheckpointInterval: DefaultCheckpointInterval,
MonitorInterval: DefaultMonitorInterval,
Logger: slog.With("db", path),
}
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
db.walSizeGauge = walSizeGaugeVec.WithLabelValues(db.path)
db.totalWALBytesCounter = totalWALBytesCounterVec.WithLabelValues(db.path)
db.shadowWALIndexGauge = shadowWALIndexGaugeVec.WithLabelValues(db.path)
db.shadowWALSizeGauge = shadowWALSizeGaugeVec.WithLabelValues(db.path)
db.syncNCounter = syncNCounterVec.WithLabelValues(db.path)
db.syncErrorNCounter = syncErrorNCounterVec.WithLabelValues(db.path)
db.syncSecondsCounter = syncSecondsCounterVec.WithLabelValues(db.path)
db.checkpointNCounterVec = checkpointNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointErrorNCounterVec = checkpointErrorNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointSecondsCounterVec = checkpointSecondsCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.ctx, db.cancel = context.WithCancel(context.Background())
return db
}
// SQLDB returns a reference to the underlying sql.DB connection. // SQLDB returns a reference to the underlying sql.DB connection.
func (db *DB) SQLDB() *sql.DB { func (db *DB) SQLDB() *sql.DB {
return db.db return db.db
@@ -629,7 +663,7 @@ func (db *DB) acquireReadLock() error {
} }
// Execute read query to obtain read lock. // Execute read query to obtain read lock.
if _, err := tx.ExecContext(context.Background(), `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
_ = tx.Rollback() _ = tx.Rollback()
return err return err
} }
@@ -649,10 +683,6 @@ func (db *DB) releaseReadLock() error {
// Rollback & clear read transaction. // Rollback & clear read transaction.
err := db.rtx.Rollback() err := db.rtx.Rollback()
db.rtx = nil db.rtx = nil
if errors.Is(err, context.Canceled) {
err = nil
}
return err return err
} }
@@ -697,7 +727,7 @@ func (db *DB) createGeneration() (string, error) {
// Atomically write generation name as current generation. // Atomically write generation name as current generation.
generationNamePath := db.GenerationNamePath() generationNamePath := db.GenerationNamePath()
mode := os.FileMode(0o600) mode := os.FileMode(0600)
if db.fileInfo != nil { if db.fileInfo != nil {
mode = db.fileInfo.Mode() mode = db.fileInfo.Mode()
} }
@@ -990,7 +1020,7 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
} }
// Write header to new WAL shadow file. // Write header to new WAL shadow file.
mode := os.FileMode(0o600) mode := os.FileMode(0600)
if fi := db.fileInfo; fi != nil { if fi := db.fileInfo; fi != nil {
mode = fi.Mode() mode = fi.Mode()
} }
@@ -1026,7 +1056,7 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
} }
origWalSize = frameAlign(fi.Size(), db.pageSize) origWalSize = frameAlign(fi.Size(), db.pageSize)
w, err := os.OpenFile(filename, os.O_RDWR, 0o666) w, err := os.OpenFile(filename, os.O_RDWR, 0666)
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
@@ -1309,7 +1339,7 @@ func (db *DB) Checkpoint(ctx context.Context, mode string) (err error) {
return db.checkpoint(ctx, generation, mode) return db.checkpoint(ctx, generation, mode)
} }
// checkpointAndInit performs a checkpoint on the WAL file and initializes a // checkpoint performs a checkpoint on the WAL file and initializes a
// new shadow WAL file. // new shadow WAL file.
func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
// Try getting a checkpoint lock, will fail during snapshots. // Try getting a checkpoint lock, will fail during snapshots.
@@ -1338,7 +1368,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
// a new page is written. // a new page is written.
if err := db.execCheckpoint(mode); err != nil { if err := db.execCheckpoint(mode); err != nil {
return err return err
} else if err := db.ensureWALExists(); err != nil { } else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
return err return err
} }
@@ -1428,7 +1458,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {
// Reacquire the read lock immediately after the checkpoint. // Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil { if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("acquire read lock: %w", err) return fmt.Errorf("release read lock: %w", err)
} }
return nil return nil

View File

@@ -375,7 +375,7 @@ func TestDB_Sync(t *testing.T) {
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index) shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := os.ReadFile(shadowWALPath); err != nil { if buf, err := os.ReadFile(shadowWALPath); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0o600); err != nil { } else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -552,7 +552,12 @@ func TestDB_Sync(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// NOTE: The minimum checkpoint may only do a PASSIVE checkpoint so we can't guarantee a rollover. // Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
}) })
// Ensure DB checkpoints after interval. // Ensure DB checkpoints after interval.
@@ -576,6 +581,13 @@ func TestDB_Sync(t *testing.T) {
} else if err := db.Sync(context.Background()); err != nil { } else if err := db.Sync(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
}) })
} }

View File

@@ -50,10 +50,6 @@ func TestReplica_Sync(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
}
c := file.NewReplicaClient(t.TempDir()) c := file.NewReplicaClient(t.TempDir())
r := litestream.NewReplica(db, "") r := litestream.NewReplica(db, "")
c.Replica, r.Client = r, c c.Replica, r.Client = r, c
@@ -146,7 +142,7 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil { } else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := info.Pos(), pos0.Truncate(); got != want { } else if got, want := info.Pos(), nextIndex(pos0); got != want {
t.Fatalf("pos=%s, want %s", got, want) t.Fatalf("pos=%s, want %s", got, want)
} }
@@ -170,18 +166,20 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil { } else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := info.Pos(), pos1.Truncate(); got != want { } else if got, want := info.Pos(), nextIndex(pos1); got != want {
t.Fatalf("pos=%v, want %v", got, want) t.Fatalf("pos=%v, want %v", got, want)
} }
// Verify snapshots exist. // Verify three snapshots exist.
if infos, err := r.Snapshots(context.Background()); err != nil { if infos, err := r.Snapshots(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := len(infos), 2; got != want { } else if got, want := len(infos), 3; got != want {
t.Fatalf("len=%v, want %v", got, want) t.Fatalf("len=%v, want %v", got, want)
} else if got, want := infos[0].Pos(), pos0.Truncate(); got != want { } else if got, want := infos[0].Pos(), pos0.Truncate(); got != want {
t.Fatalf("info[0]=%s, want %s", got, want) t.Fatalf("info[0]=%s, want %s", got, want)
} else if got, want := infos[1].Pos(), pos1.Truncate(); got != want { } else if got, want := infos[1].Pos(), nextIndex(pos0); got != want {
t.Fatalf("info[1]=%s, want %s", got, want) t.Fatalf("info[1]=%s, want %s", got, want)
} else if got, want := infos[2].Pos(), nextIndex(pos1); got != want {
t.Fatalf("info[2]=%s, want %s", got, want)
} }
} }

View File

@@ -136,12 +136,12 @@ func (c *ReplicaClient) findBucketRegion(ctx context.Context, bucket string) (st
// Fetch bucket location, if possible. Must be bucket owner. // Fetch bucket location, if possible. Must be bucket owner.
// This call can return a nil location which means it's in us-east-1. // This call can return a nil location which means it's in us-east-1.
if out, err := s3.New(sess).GetBucketLocation(&s3.GetBucketLocationInput{ if out, err := s3.New(sess).HeadBucketWithContext(ctx, &s3.HeadBucketInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
}); err != nil { }); err != nil {
return "", err return "", err
} else if out.LocationConstraint != nil { } else if out.BucketRegion != nil {
return *out.LocationConstraint, nil return *out.BucketRegion, nil
} }
return DefaultRegion, nil return DefaultRegion, nil
} }