Compare commits
3 Commits
422-fix-li
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 934e65a4e6 | |||
|
|
5be467a478 | ||
|
|
5e1c112468 |
52
db.go
52
db.go
@@ -145,6 +145,40 @@ func NewDB(path string) *DB {
|
|||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewDB returns a new instance of DB for a given path.
|
||||||
|
func NewLSDB(path string, metapath string) *DB {
|
||||||
|
_, file := filepath.Split(path)
|
||||||
|
|
||||||
|
db := &DB{
|
||||||
|
path: path,
|
||||||
|
metaPath: filepath.Join(metapath, "."+file+MetaDirSuffix),
|
||||||
|
notify: make(chan struct{}),
|
||||||
|
|
||||||
|
MinCheckpointPageN: DefaultMinCheckpointPageN,
|
||||||
|
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
||||||
|
TruncatePageN: DefaultTruncatePageN,
|
||||||
|
CheckpointInterval: DefaultCheckpointInterval,
|
||||||
|
MonitorInterval: DefaultMonitorInterval,
|
||||||
|
Logger: slog.With("db", path),
|
||||||
|
}
|
||||||
|
|
||||||
|
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
|
||||||
|
db.walSizeGauge = walSizeGaugeVec.WithLabelValues(db.path)
|
||||||
|
db.totalWALBytesCounter = totalWALBytesCounterVec.WithLabelValues(db.path)
|
||||||
|
db.shadowWALIndexGauge = shadowWALIndexGaugeVec.WithLabelValues(db.path)
|
||||||
|
db.shadowWALSizeGauge = shadowWALSizeGaugeVec.WithLabelValues(db.path)
|
||||||
|
db.syncNCounter = syncNCounterVec.WithLabelValues(db.path)
|
||||||
|
db.syncErrorNCounter = syncErrorNCounterVec.WithLabelValues(db.path)
|
||||||
|
db.syncSecondsCounter = syncSecondsCounterVec.WithLabelValues(db.path)
|
||||||
|
db.checkpointNCounterVec = checkpointNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
|
||||||
|
db.checkpointErrorNCounterVec = checkpointErrorNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
|
||||||
|
db.checkpointSecondsCounterVec = checkpointSecondsCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
|
||||||
|
|
||||||
|
db.ctx, db.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
|
||||||
// SQLDB returns a reference to the underlying sql.DB connection.
|
// SQLDB returns a reference to the underlying sql.DB connection.
|
||||||
func (db *DB) SQLDB() *sql.DB {
|
func (db *DB) SQLDB() *sql.DB {
|
||||||
return db.db
|
return db.db
|
||||||
@@ -629,7 +663,7 @@ func (db *DB) acquireReadLock() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Execute read query to obtain read lock.
|
// Execute read query to obtain read lock.
|
||||||
if _, err := tx.ExecContext(context.Background(), `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
||||||
_ = tx.Rollback()
|
_ = tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -649,10 +683,6 @@ func (db *DB) releaseReadLock() error {
|
|||||||
// Rollback & clear read transaction.
|
// Rollback & clear read transaction.
|
||||||
err := db.rtx.Rollback()
|
err := db.rtx.Rollback()
|
||||||
db.rtx = nil
|
db.rtx = nil
|
||||||
|
|
||||||
if errors.Is(err, context.Canceled) {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -697,7 +727,7 @@ func (db *DB) createGeneration() (string, error) {
|
|||||||
|
|
||||||
// Atomically write generation name as current generation.
|
// Atomically write generation name as current generation.
|
||||||
generationNamePath := db.GenerationNamePath()
|
generationNamePath := db.GenerationNamePath()
|
||||||
mode := os.FileMode(0o600)
|
mode := os.FileMode(0600)
|
||||||
if db.fileInfo != nil {
|
if db.fileInfo != nil {
|
||||||
mode = db.fileInfo.Mode()
|
mode = db.fileInfo.Mode()
|
||||||
}
|
}
|
||||||
@@ -990,7 +1020,7 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write header to new WAL shadow file.
|
// Write header to new WAL shadow file.
|
||||||
mode := os.FileMode(0o600)
|
mode := os.FileMode(0600)
|
||||||
if fi := db.fileInfo; fi != nil {
|
if fi := db.fileInfo; fi != nil {
|
||||||
mode = fi.Mode()
|
mode = fi.Mode()
|
||||||
}
|
}
|
||||||
@@ -1026,7 +1056,7 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
|
|||||||
}
|
}
|
||||||
origWalSize = frameAlign(fi.Size(), db.pageSize)
|
origWalSize = frameAlign(fi.Size(), db.pageSize)
|
||||||
|
|
||||||
w, err := os.OpenFile(filename, os.O_RDWR, 0o666)
|
w, err := os.OpenFile(filename, os.O_RDWR, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
@@ -1309,7 +1339,7 @@ func (db *DB) Checkpoint(ctx context.Context, mode string) (err error) {
|
|||||||
return db.checkpoint(ctx, generation, mode)
|
return db.checkpoint(ctx, generation, mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkpointAndInit performs a checkpoint on the WAL file and initializes a
|
// checkpoint performs a checkpoint on the WAL file and initializes a
|
||||||
// new shadow WAL file.
|
// new shadow WAL file.
|
||||||
func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
||||||
// Try getting a checkpoint lock, will fail during snapshots.
|
// Try getting a checkpoint lock, will fail during snapshots.
|
||||||
@@ -1338,7 +1368,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
|||||||
// a new page is written.
|
// a new page is written.
|
||||||
if err := db.execCheckpoint(mode); err != nil {
|
if err := db.execCheckpoint(mode); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if err := db.ensureWALExists(); err != nil {
|
} else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1428,7 +1458,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {
|
|||||||
|
|
||||||
// Reacquire the read lock immediately after the checkpoint.
|
// Reacquire the read lock immediately after the checkpoint.
|
||||||
if err := db.acquireReadLock(); err != nil {
|
if err := db.acquireReadLock(); err != nil {
|
||||||
return fmt.Errorf("acquire read lock: %w", err)
|
return fmt.Errorf("release read lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
16
db_test.go
16
db_test.go
@@ -375,7 +375,7 @@ func TestDB_Sync(t *testing.T) {
|
|||||||
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
|
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
|
||||||
if buf, err := os.ReadFile(shadowWALPath); err != nil {
|
if buf, err := os.ReadFile(shadowWALPath); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0o600); err != nil {
|
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -552,7 +552,12 @@ func TestDB_Sync(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: The minimum checkpoint may only do a PASSIVE checkpoint so we can't guarantee a rollover.
|
// Ensure position is now on the second index.
|
||||||
|
if pos, err := db.Pos(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := pos.Index, 1; got != want {
|
||||||
|
t.Fatalf("Index=%v, want %v", got, want)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Ensure DB checkpoints after interval.
|
// Ensure DB checkpoints after interval.
|
||||||
@@ -576,6 +581,13 @@ func TestDB_Sync(t *testing.T) {
|
|||||||
} else if err := db.Sync(context.Background()); err != nil {
|
} else if err := db.Sync(context.Background()); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure position is now on the second index.
|
||||||
|
if pos, err := db.Pos(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := pos.Index, 1; got != want {
|
||||||
|
t.Fatalf("Index=%v, want %v", got, want)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -50,10 +50,6 @@ func TestReplica_Sync(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c := file.NewReplicaClient(t.TempDir())
|
c := file.NewReplicaClient(t.TempDir())
|
||||||
r := litestream.NewReplica(db, "")
|
r := litestream.NewReplica(db, "")
|
||||||
c.Replica, r.Client = r, c
|
c.Replica, r.Client = r, c
|
||||||
@@ -146,7 +142,7 @@ func TestReplica_Snapshot(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if info, err := r.Snapshot(context.Background()); err != nil {
|
} else if info, err := r.Snapshot(context.Background()); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if got, want := info.Pos(), pos0.Truncate(); got != want {
|
} else if got, want := info.Pos(), nextIndex(pos0); got != want {
|
||||||
t.Fatalf("pos=%s, want %s", got, want)
|
t.Fatalf("pos=%s, want %s", got, want)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,18 +166,20 @@ func TestReplica_Snapshot(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if info, err := r.Snapshot(context.Background()); err != nil {
|
} else if info, err := r.Snapshot(context.Background()); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if got, want := info.Pos(), pos1.Truncate(); got != want {
|
} else if got, want := info.Pos(), nextIndex(pos1); got != want {
|
||||||
t.Fatalf("pos=%v, want %v", got, want)
|
t.Fatalf("pos=%v, want %v", got, want)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify snapshots exist.
|
// Verify three snapshots exist.
|
||||||
if infos, err := r.Snapshots(context.Background()); err != nil {
|
if infos, err := r.Snapshots(context.Background()); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if got, want := len(infos), 2; got != want {
|
} else if got, want := len(infos), 3; got != want {
|
||||||
t.Fatalf("len=%v, want %v", got, want)
|
t.Fatalf("len=%v, want %v", got, want)
|
||||||
} else if got, want := infos[0].Pos(), pos0.Truncate(); got != want {
|
} else if got, want := infos[0].Pos(), pos0.Truncate(); got != want {
|
||||||
t.Fatalf("info[0]=%s, want %s", got, want)
|
t.Fatalf("info[0]=%s, want %s", got, want)
|
||||||
} else if got, want := infos[1].Pos(), pos1.Truncate(); got != want {
|
} else if got, want := infos[1].Pos(), nextIndex(pos0); got != want {
|
||||||
t.Fatalf("info[1]=%s, want %s", got, want)
|
t.Fatalf("info[1]=%s, want %s", got, want)
|
||||||
|
} else if got, want := infos[2].Pos(), nextIndex(pos1); got != want {
|
||||||
|
t.Fatalf("info[2]=%s, want %s", got, want)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,12 +136,12 @@ func (c *ReplicaClient) findBucketRegion(ctx context.Context, bucket string) (st
|
|||||||
|
|
||||||
// Fetch bucket location, if possible. Must be bucket owner.
|
// Fetch bucket location, if possible. Must be bucket owner.
|
||||||
// This call can return a nil location which means it's in us-east-1.
|
// This call can return a nil location which means it's in us-east-1.
|
||||||
if out, err := s3.New(sess).GetBucketLocation(&s3.GetBucketLocationInput{
|
if out, err := s3.New(sess).HeadBucketWithContext(ctx, &s3.HeadBucketInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if out.LocationConstraint != nil {
|
} else if out.BucketRegion != nil {
|
||||||
return *out.LocationConstraint, nil
|
return *out.BucketRegion, nil
|
||||||
}
|
}
|
||||||
return DefaultRegion, nil
|
return DefaultRegion, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user