Compare commits

..

1 Commits

Author SHA1 Message Date
Ben Johnson
80280fce54 Only update litestream_seq if size is below WAL header size 2024-03-05 15:09:07 -07:00
4 changed files with 25 additions and 65 deletions

52
db.go
View File

@@ -145,40 +145,6 @@ func NewDB(path string) *DB {
return db return db
} }
// NewDB returns a new instance of DB for a given path.
func NewLSDB(path string, metapath string) *DB {
_, file := filepath.Split(path)
db := &DB{
path: path,
metaPath: filepath.Join(metapath, "."+file+MetaDirSuffix),
notify: make(chan struct{}),
MinCheckpointPageN: DefaultMinCheckpointPageN,
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
TruncatePageN: DefaultTruncatePageN,
CheckpointInterval: DefaultCheckpointInterval,
MonitorInterval: DefaultMonitorInterval,
Logger: slog.With("db", path),
}
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
db.walSizeGauge = walSizeGaugeVec.WithLabelValues(db.path)
db.totalWALBytesCounter = totalWALBytesCounterVec.WithLabelValues(db.path)
db.shadowWALIndexGauge = shadowWALIndexGaugeVec.WithLabelValues(db.path)
db.shadowWALSizeGauge = shadowWALSizeGaugeVec.WithLabelValues(db.path)
db.syncNCounter = syncNCounterVec.WithLabelValues(db.path)
db.syncErrorNCounter = syncErrorNCounterVec.WithLabelValues(db.path)
db.syncSecondsCounter = syncSecondsCounterVec.WithLabelValues(db.path)
db.checkpointNCounterVec = checkpointNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointErrorNCounterVec = checkpointErrorNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointSecondsCounterVec = checkpointSecondsCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.ctx, db.cancel = context.WithCancel(context.Background())
return db
}
// SQLDB returns a reference to the underlying sql.DB connection. // SQLDB returns a reference to the underlying sql.DB connection.
func (db *DB) SQLDB() *sql.DB { func (db *DB) SQLDB() *sql.DB {
return db.db return db.db
@@ -663,7 +629,7 @@ func (db *DB) acquireReadLock() error {
} }
// Execute read query to obtain read lock. // Execute read query to obtain read lock.
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { if _, err := tx.ExecContext(context.Background(), `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
_ = tx.Rollback() _ = tx.Rollback()
return err return err
} }
@@ -683,6 +649,10 @@ func (db *DB) releaseReadLock() error {
// Rollback & clear read transaction. // Rollback & clear read transaction.
err := db.rtx.Rollback() err := db.rtx.Rollback()
db.rtx = nil db.rtx = nil
if errors.Is(err, context.Canceled) {
err = nil
}
return err return err
} }
@@ -727,7 +697,7 @@ func (db *DB) createGeneration() (string, error) {
// Atomically write generation name as current generation. // Atomically write generation name as current generation.
generationNamePath := db.GenerationNamePath() generationNamePath := db.GenerationNamePath()
mode := os.FileMode(0600) mode := os.FileMode(0o600)
if db.fileInfo != nil { if db.fileInfo != nil {
mode = db.fileInfo.Mode() mode = db.fileInfo.Mode()
} }
@@ -1020,7 +990,7 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
} }
// Write header to new WAL shadow file. // Write header to new WAL shadow file.
mode := os.FileMode(0600) mode := os.FileMode(0o600)
if fi := db.fileInfo; fi != nil { if fi := db.fileInfo; fi != nil {
mode = fi.Mode() mode = fi.Mode()
} }
@@ -1056,7 +1026,7 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
} }
origWalSize = frameAlign(fi.Size(), db.pageSize) origWalSize = frameAlign(fi.Size(), db.pageSize)
w, err := os.OpenFile(filename, os.O_RDWR, 0666) w, err := os.OpenFile(filename, os.O_RDWR, 0o666)
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
@@ -1339,7 +1309,7 @@ func (db *DB) Checkpoint(ctx context.Context, mode string) (err error) {
return db.checkpoint(ctx, generation, mode) return db.checkpoint(ctx, generation, mode)
} }
// checkpoint performs a checkpoint on the WAL file and initializes a // checkpointAndInit performs a checkpoint on the WAL file and initializes a
// new shadow WAL file. // new shadow WAL file.
func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
// Try getting a checkpoint lock, will fail during snapshots. // Try getting a checkpoint lock, will fail during snapshots.
@@ -1368,7 +1338,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
// a new page is written. // a new page is written.
if err := db.execCheckpoint(mode); err != nil { if err := db.execCheckpoint(mode); err != nil {
return err return err
} else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil { } else if err := db.ensureWALExists(); err != nil {
return err return err
} }
@@ -1458,7 +1428,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {
// Reacquire the read lock immediately after the checkpoint. // Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil { if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err) return fmt.Errorf("acquire read lock: %w", err)
} }
return nil return nil

View File

@@ -375,7 +375,7 @@ func TestDB_Sync(t *testing.T) {
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index) shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := os.ReadFile(shadowWALPath); err != nil { if buf, err := os.ReadFile(shadowWALPath); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil { } else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0o600); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -552,12 +552,7 @@ func TestDB_Sync(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Ensure position is now on the second index. // NOTE: The minimum checkpoint may only do a PASSIVE checkpoint so we can't guarantee a rollover.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
}) })
// Ensure DB checkpoints after interval. // Ensure DB checkpoints after interval.
@@ -581,13 +576,6 @@ func TestDB_Sync(t *testing.T) {
} else if err := db.Sync(context.Background()); err != nil { } else if err := db.Sync(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
}) })
} }

View File

@@ -50,6 +50,10 @@ func TestReplica_Sync(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
}
c := file.NewReplicaClient(t.TempDir()) c := file.NewReplicaClient(t.TempDir())
r := litestream.NewReplica(db, "") r := litestream.NewReplica(db, "")
c.Replica, r.Client = r, c c.Replica, r.Client = r, c
@@ -142,7 +146,7 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil { } else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := info.Pos(), nextIndex(pos0); got != want { } else if got, want := info.Pos(), pos0.Truncate(); got != want {
t.Fatalf("pos=%s, want %s", got, want) t.Fatalf("pos=%s, want %s", got, want)
} }
@@ -166,20 +170,18 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil { } else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := info.Pos(), nextIndex(pos1); got != want { } else if got, want := info.Pos(), pos1.Truncate(); got != want {
t.Fatalf("pos=%v, want %v", got, want) t.Fatalf("pos=%v, want %v", got, want)
} }
// Verify three snapshots exist. // Verify snapshots exist.
if infos, err := r.Snapshots(context.Background()); err != nil { if infos, err := r.Snapshots(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} else if got, want := len(infos), 3; got != want { } else if got, want := len(infos), 2; got != want {
t.Fatalf("len=%v, want %v", got, want) t.Fatalf("len=%v, want %v", got, want)
} else if got, want := infos[0].Pos(), pos0.Truncate(); got != want { } else if got, want := infos[0].Pos(), pos0.Truncate(); got != want {
t.Fatalf("info[0]=%s, want %s", got, want) t.Fatalf("info[0]=%s, want %s", got, want)
} else if got, want := infos[1].Pos(), nextIndex(pos0); got != want { } else if got, want := infos[1].Pos(), pos1.Truncate(); got != want {
t.Fatalf("info[1]=%s, want %s", got, want) t.Fatalf("info[1]=%s, want %s", got, want)
} else if got, want := infos[2].Pos(), nextIndex(pos1); got != want {
t.Fatalf("info[2]=%s, want %s", got, want)
} }
} }

View File

@@ -136,12 +136,12 @@ func (c *ReplicaClient) findBucketRegion(ctx context.Context, bucket string) (st
// Fetch bucket location, if possible. Must be bucket owner. // Fetch bucket location, if possible. Must be bucket owner.
// This call can return a nil location which means it's in us-east-1. // This call can return a nil location which means it's in us-east-1.
if out, err := s3.New(sess).HeadBucketWithContext(ctx, &s3.HeadBucketInput{ if out, err := s3.New(sess).GetBucketLocation(&s3.GetBucketLocationInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
}); err != nil { }); err != nil {
return "", err return "", err
} else if out.BucketRegion != nil { } else if out.LocationConstraint != nil {
return *out.BucketRegion, nil return *out.LocationConstraint, nil
} }
return DefaultRegion, nil return DefaultRegion, nil
} }