This commit changes the restore to download multiple WAL files to the local disk in parallel while another goroutine applies those files in order. Downloading & applying the WAL files in serial reduces the total throughput as WAL files are typically made up of multiple small files.
1927 lines
57 KiB
Go
1927 lines
57 KiB
Go
package litestream
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"hash/crc64"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"math"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// Default DB settings.
|
|
const (
|
|
DefaultMonitorInterval = 1 * time.Second
|
|
DefaultCheckpointInterval = 1 * time.Minute
|
|
DefaultMinCheckpointPageN = 1000
|
|
DefaultMaxCheckpointPageN = 10000
|
|
)
|
|
|
|
// MaxIndex is the maximum possible WAL index.
|
|
// If this index is reached then a new generation will be started.
|
|
const MaxIndex = 0x7FFFFFFF
|
|
|
|
// BusyTimeout is the timeout to wait for EBUSY from SQLite.
|
|
const BusyTimeout = 1 * time.Second
|
|
|
|
// DB represents a managed instance of a SQLite database in the file system.
|
|
type DB struct {
|
|
mu sync.RWMutex
|
|
path string // part to database
|
|
db *sql.DB // target database
|
|
f *os.File // long-running db file descriptor
|
|
rtx *sql.Tx // long running read transaction
|
|
pageSize int // page size, in bytes
|
|
notify chan struct{} // closes on WAL change
|
|
|
|
uid, gid int // db user/group obtained on init
|
|
mode os.FileMode
|
|
diruid, dirgid int // db parent user/group obtained on init
|
|
dirmode os.FileMode
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
wg sync.WaitGroup
|
|
|
|
// Metrics
|
|
dbSizeGauge prometheus.Gauge
|
|
walSizeGauge prometheus.Gauge
|
|
totalWALBytesCounter prometheus.Counter
|
|
shadowWALIndexGauge prometheus.Gauge
|
|
shadowWALSizeGauge prometheus.Gauge
|
|
syncNCounter prometheus.Counter
|
|
syncErrorNCounter prometheus.Counter
|
|
syncSecondsCounter prometheus.Counter
|
|
checkpointNCounterVec *prometheus.CounterVec
|
|
checkpointErrorNCounterVec *prometheus.CounterVec
|
|
checkpointSecondsCounterVec *prometheus.CounterVec
|
|
|
|
// Minimum threshold of WAL size, in pages, before a passive checkpoint.
|
|
// A passive checkpoint will attempt a checkpoint but fail if there are
|
|
// active transactions occurring at the same time.
|
|
MinCheckpointPageN int
|
|
|
|
// Maximum threshold of WAL size, in pages, before a forced checkpoint.
|
|
// A forced checkpoint will block new transactions and wait for existing
|
|
// transactions to finish before issuing a checkpoint and resetting the WAL.
|
|
//
|
|
// If zero, no checkpoints are forced. This can cause the WAL to grow
|
|
// unbounded if there are always read transactions occurring.
|
|
MaxCheckpointPageN int
|
|
|
|
// Time between automatic checkpoints in the WAL. This is done to allow
|
|
// more fine-grained WAL files so that restores can be performed with
|
|
// better precision.
|
|
CheckpointInterval time.Duration
|
|
|
|
// Frequency at which to perform db sync.
|
|
MonitorInterval time.Duration
|
|
|
|
// List of replicas for the database.
|
|
// Must be set before calling Open().
|
|
Replicas []Replica
|
|
}
|
|
|
|
// NewDB returns a new instance of DB for a given path.
|
|
func NewDB(path string) *DB {
|
|
db := &DB{
|
|
path: path,
|
|
notify: make(chan struct{}),
|
|
uid: -1, gid: -1, mode: 0600,
|
|
diruid: -1, dirgid: -1, dirmode: 0700,
|
|
|
|
MinCheckpointPageN: DefaultMinCheckpointPageN,
|
|
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
|
CheckpointInterval: DefaultCheckpointInterval,
|
|
MonitorInterval: DefaultMonitorInterval,
|
|
}
|
|
|
|
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
|
|
db.walSizeGauge = walSizeGaugeVec.WithLabelValues(db.path)
|
|
db.totalWALBytesCounter = totalWALBytesCounterVec.WithLabelValues(db.path)
|
|
db.shadowWALIndexGauge = shadowWALIndexGaugeVec.WithLabelValues(db.path)
|
|
db.shadowWALSizeGauge = shadowWALSizeGaugeVec.WithLabelValues(db.path)
|
|
db.syncNCounter = syncNCounterVec.WithLabelValues(db.path)
|
|
db.syncErrorNCounter = syncErrorNCounterVec.WithLabelValues(db.path)
|
|
db.syncSecondsCounter = syncSecondsCounterVec.WithLabelValues(db.path)
|
|
db.checkpointNCounterVec = checkpointNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
|
|
db.checkpointErrorNCounterVec = checkpointErrorNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
|
|
db.checkpointSecondsCounterVec = checkpointSecondsCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
|
|
|
|
db.ctx, db.cancel = context.WithCancel(context.Background())
|
|
|
|
return db
|
|
}
|
|
|
|
// SQLDB returns a reference to the underlying sql.DB connection.
|
|
func (db *DB) SQLDB() *sql.DB {
|
|
return db.db
|
|
}
|
|
|
|
// Path returns the path to the database.
|
|
func (db *DB) Path() string {
|
|
return db.path
|
|
}
|
|
|
|
// WALPath returns the path to the database's WAL file.
|
|
func (db *DB) WALPath() string {
|
|
return db.path + "-wal"
|
|
}
|
|
|
|
// MetaPath returns the path to the database metadata.
|
|
func (db *DB) MetaPath() string {
|
|
dir, file := filepath.Split(db.path)
|
|
return filepath.Join(dir, "."+file+MetaDirSuffix)
|
|
}
|
|
|
|
// GenerationNamePath returns the path of the name of the current generation.
|
|
func (db *DB) GenerationNamePath() string {
|
|
return filepath.Join(db.MetaPath(), "generation")
|
|
}
|
|
|
|
// GenerationPath returns the path of a single generation.
|
|
// Panics if generation is blank.
|
|
func (db *DB) GenerationPath(generation string) string {
|
|
assert(generation != "", "generation name required")
|
|
return filepath.Join(db.MetaPath(), "generations", generation)
|
|
}
|
|
|
|
// ShadowWALDir returns the path of the shadow wal directory.
|
|
// Panics if generation is blank.
|
|
func (db *DB) ShadowWALDir(generation string) string {
|
|
return filepath.Join(db.GenerationPath(generation), "wal")
|
|
}
|
|
|
|
// ShadowWALPath returns the path of a single shadow WAL file.
|
|
// Panics if generation is blank or index is negative.
|
|
func (db *DB) ShadowWALPath(generation string, index int) string {
|
|
assert(index >= 0, "shadow wal index cannot be negative")
|
|
return filepath.Join(db.ShadowWALDir(generation), FormatWALPath(index))
|
|
}
|
|
|
|
// CurrentShadowWALPath returns the path to the last shadow WAL in a generation.
|
|
func (db *DB) CurrentShadowWALPath(generation string) (string, error) {
|
|
index, _, err := db.CurrentShadowWALIndex(generation)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return db.ShadowWALPath(generation, index), nil
|
|
}
|
|
|
|
// CurrentShadowWALIndex returns the current WAL index & total size.
|
|
func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, err error) {
|
|
fis, err := ioutil.ReadDir(filepath.Join(db.GenerationPath(generation), "wal"))
|
|
if os.IsNotExist(err) {
|
|
return 0, 0, nil // no wal files written for generation
|
|
} else if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Find highest wal index.
|
|
for _, fi := range fis {
|
|
if !strings.HasSuffix(fi.Name(), WALExt) {
|
|
continue
|
|
}
|
|
if v, _, _, err := ParseWALPath(fi.Name()); err != nil {
|
|
continue // invalid wal filename
|
|
} else if v > index {
|
|
index = v
|
|
}
|
|
|
|
size += fi.Size()
|
|
}
|
|
return index, size, nil
|
|
}
|
|
|
|
// Replica returns a replica by name.
|
|
func (db *DB) Replica(name string) Replica {
|
|
for _, r := range db.Replicas {
|
|
if r.Name() == name {
|
|
return r
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Pos returns the current position of the database.
|
|
func (db *DB) Pos() (Pos, error) {
|
|
generation, err := db.CurrentGeneration()
|
|
if err != nil {
|
|
return Pos{}, err
|
|
} else if generation == "" {
|
|
return Pos{}, nil
|
|
}
|
|
|
|
index, _, err := db.CurrentShadowWALIndex(generation)
|
|
if err != nil {
|
|
return Pos{}, err
|
|
}
|
|
|
|
fi, err := os.Stat(db.ShadowWALPath(generation, index))
|
|
if os.IsNotExist(err) {
|
|
return Pos{Generation: generation, Index: index}, nil
|
|
} else if err != nil {
|
|
return Pos{}, err
|
|
}
|
|
|
|
return Pos{Generation: generation, Index: index, Offset: frameAlign(fi.Size(), db.pageSize)}, nil
|
|
}
|
|
|
|
// Notify returns a channel that closes when the shadow WAL changes.
|
|
func (db *DB) Notify() <-chan struct{} {
|
|
db.mu.RLock()
|
|
defer db.mu.RUnlock()
|
|
return db.notify
|
|
}
|
|
|
|
// PageSize returns the page size of the underlying database.
|
|
// Only valid after database exists & Init() has successfully run.
|
|
func (db *DB) PageSize() int {
|
|
db.mu.RLock()
|
|
defer db.mu.RUnlock()
|
|
return db.pageSize
|
|
}
|
|
|
|
// Open initializes the background monitoring goroutine.
|
|
func (db *DB) Open() (err error) {
|
|
// Validate fields on database.
|
|
if db.MinCheckpointPageN <= 0 {
|
|
return fmt.Errorf("minimum checkpoint page count required")
|
|
}
|
|
|
|
// Validate that all replica names are unique.
|
|
m := make(map[string]struct{})
|
|
for _, r := range db.Replicas {
|
|
if _, ok := m[r.Name()]; ok {
|
|
return fmt.Errorf("duplicate replica name: %q", r.Name())
|
|
}
|
|
m[r.Name()] = struct{}{}
|
|
}
|
|
|
|
// Clear old temporary files that my have been left from a crash.
|
|
if err := removeTmpFiles(db.MetaPath()); err != nil {
|
|
return fmt.Errorf("cannot remove tmp files: %w", err)
|
|
}
|
|
|
|
// Start monitoring SQLite database in a separate goroutine.
|
|
if db.MonitorInterval > 0 {
|
|
db.wg.Add(1)
|
|
go func() { defer db.wg.Done(); db.monitor() }()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close releases the read lock & closes the database. This method should only
|
|
// be called by tests as it causes the underlying database to be checkpointed.
|
|
func (db *DB) Close() (err error) {
|
|
return db.close(false)
|
|
}
|
|
|
|
// SoftClose closes everything but the underlying db connection. This method
|
|
// is available because the binary needs to avoid closing the database on exit
|
|
// to prevent autocheckpointing.
|
|
func (db *DB) SoftClose() (err error) {
|
|
return db.close(true)
|
|
}
|
|
|
|
func (db *DB) close(soft bool) (err error) {
|
|
db.cancel()
|
|
db.wg.Wait()
|
|
|
|
// Start a new context for shutdown since we canceled the DB context.
|
|
ctx := context.Background()
|
|
|
|
// Perform a final db sync, if initialized.
|
|
if db.db != nil {
|
|
if e := db.Sync(ctx); e != nil && err == nil {
|
|
err = e
|
|
}
|
|
}
|
|
|
|
// Ensure replicas perform a final sync and stop replicating.
|
|
for _, r := range db.Replicas {
|
|
if db.db != nil {
|
|
if e := r.Sync(ctx); e != nil && err == nil {
|
|
err = e
|
|
}
|
|
}
|
|
r.Stop(!soft)
|
|
}
|
|
|
|
// Release the read lock to allow other applications to handle checkpointing.
|
|
if db.rtx != nil {
|
|
if e := db.releaseReadLock(); e != nil && err == nil {
|
|
err = e
|
|
}
|
|
}
|
|
|
|
// Only perform full close if this is not a soft close.
|
|
// This closes the underlying database connection which can clean up the WAL.
|
|
if !soft && db.db != nil {
|
|
if e := db.db.Close(); e != nil && err == nil {
|
|
err = e
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// UpdatedAt returns the last modified time of the database or WAL file.
|
|
func (db *DB) UpdatedAt() (time.Time, error) {
|
|
// Determine database modified time.
|
|
fi, err := os.Stat(db.Path())
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
t := fi.ModTime().UTC()
|
|
|
|
// Use WAL modified time, if available & later.
|
|
if fi, err := os.Stat(db.WALPath()); os.IsNotExist(err) {
|
|
return t, nil
|
|
} else if err != nil {
|
|
return t, err
|
|
} else if fi.ModTime().After(t) {
|
|
t = fi.ModTime().UTC()
|
|
}
|
|
return t, nil
|
|
}
|
|
|
|
// Snapshots returns a list of all snapshots across all replicas.
|
|
func (db *DB) Snapshots(ctx context.Context) ([]*SnapshotInfo, error) {
|
|
var infos []*SnapshotInfo
|
|
for _, r := range db.Replicas {
|
|
a, err := r.Snapshots(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
infos = append(infos, a...)
|
|
}
|
|
|
|
// Sort in order by time.
|
|
sort.Slice(infos, func(i, j int) bool {
|
|
return infos[i].CreatedAt.Before(infos[j].CreatedAt)
|
|
})
|
|
return infos, nil
|
|
}
|
|
|
|
// WALs returns a list of all WAL files across all replicas.
|
|
func (db *DB) WALs(ctx context.Context) ([]*WALInfo, error) {
|
|
var infos []*WALInfo
|
|
for _, r := range db.Replicas {
|
|
a, err := r.WALs(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
infos = append(infos, a...)
|
|
}
|
|
|
|
// Sort in order by time.
|
|
sort.Slice(infos, func(i, j int) bool {
|
|
return infos[i].CreatedAt.Before(infos[j].CreatedAt)
|
|
})
|
|
return infos, nil
|
|
}
|
|
|
|
// init initializes the connection to the database.
|
|
// Skipped if already initialized or if the database file does not exist.
|
|
func (db *DB) init() (err error) {
|
|
// Exit if already initialized.
|
|
if db.db != nil {
|
|
return nil
|
|
}
|
|
|
|
// Exit if no database file exists.
|
|
fi, err := os.Stat(db.path)
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
db.uid, db.gid = fileinfo(fi)
|
|
db.mode = fi.Mode()
|
|
|
|
// Obtain permissions for parent directory.
|
|
if fi, err = os.Stat(filepath.Dir(db.path)); err != nil {
|
|
return err
|
|
}
|
|
db.diruid, db.dirgid = fileinfo(fi)
|
|
db.dirmode = fi.Mode()
|
|
|
|
dsn := db.path
|
|
dsn += fmt.Sprintf("?_busy_timeout=%d", BusyTimeout.Milliseconds())
|
|
|
|
// Connect to SQLite database.
|
|
if db.db, err = sql.Open("sqlite3", dsn); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Open long-running database file descriptor. Required for non-OFD locks.
|
|
if db.f, err = os.Open(db.path); err != nil {
|
|
return fmt.Errorf("open db file descriptor: %w", err)
|
|
}
|
|
|
|
// Ensure database is closed if init fails.
|
|
// Initialization can retry on next sync.
|
|
defer func() {
|
|
if err != nil {
|
|
_ = db.releaseReadLock()
|
|
db.db.Close()
|
|
db.f.Close()
|
|
db.db, db.f = nil, nil
|
|
}
|
|
}()
|
|
|
|
// Enable WAL and ensure it is set. New mode should be returned on success:
|
|
// https://www.sqlite.org/pragma.html#pragma_journal_mode
|
|
var mode string
|
|
if err := db.db.QueryRow(`PRAGMA journal_mode = wal;`).Scan(&mode); err != nil {
|
|
return err
|
|
} else if mode != "wal" {
|
|
return fmt.Errorf("enable wal failed, mode=%q", mode)
|
|
}
|
|
|
|
// Disable autocheckpoint for litestream's connection.
|
|
if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil {
|
|
return fmt.Errorf("disable autocheckpoint: %w", err)
|
|
}
|
|
|
|
// Create a table to force writes to the WAL when empty.
|
|
// There should only ever be one row with id=1.
|
|
if _, err := db.db.Exec(`CREATE TABLE IF NOT EXISTS _litestream_seq (id INTEGER PRIMARY KEY, seq INTEGER);`); err != nil {
|
|
return fmt.Errorf("create _litestream_seq table: %w", err)
|
|
}
|
|
|
|
// Create a lock table to force write locks during sync.
|
|
// The sync write transaction always rolls back so no data should be in this table.
|
|
if _, err := db.db.Exec(`CREATE TABLE IF NOT EXISTS _litestream_lock (id INTEGER);`); err != nil {
|
|
return fmt.Errorf("create _litestream_lock table: %w", err)
|
|
}
|
|
|
|
// Start a long-running read transaction to prevent other transactions
|
|
// from checkpointing.
|
|
if err := db.acquireReadLock(); err != nil {
|
|
return fmt.Errorf("acquire read lock: %w", err)
|
|
}
|
|
|
|
// Read page size.
|
|
if err := db.db.QueryRow(`PRAGMA page_size;`).Scan(&db.pageSize); err != nil {
|
|
return fmt.Errorf("read page size: %w", err)
|
|
} else if db.pageSize <= 0 {
|
|
return fmt.Errorf("invalid db page size: %d", db.pageSize)
|
|
}
|
|
|
|
// Ensure meta directory structure exists.
|
|
if err := mkdirAll(db.MetaPath(), db.dirmode, db.diruid, db.dirgid); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we have an existing shadow WAL, ensure the headers match.
|
|
if err := db.verifyHeadersMatch(); err != nil {
|
|
log.Printf("%s: init: cannot determine last wal position, clearing generation; %s", db.path, err)
|
|
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("remove generation name: %w", err)
|
|
}
|
|
}
|
|
|
|
// Clean up previous generations.
|
|
if err := db.clean(); err != nil {
|
|
return fmt.Errorf("clean: %w", err)
|
|
}
|
|
|
|
// Start replication.
|
|
for _, r := range db.Replicas {
|
|
r.Start(db.ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// verifyHeadersMatch returns true if the primary WAL and last shadow WAL header match.
|
|
func (db *DB) verifyHeadersMatch() error {
|
|
// Determine current generation.
|
|
generation, err := db.CurrentGeneration()
|
|
if err != nil {
|
|
return err
|
|
} else if generation == "" {
|
|
return nil
|
|
}
|
|
|
|
// Find current generation & latest shadow WAL.
|
|
shadowWALPath, err := db.CurrentShadowWALPath(generation)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot determine current shadow wal path: %w", err)
|
|
}
|
|
|
|
hdr0, err := readWALHeader(db.WALPath())
|
|
if os.IsNotExist(err) {
|
|
return fmt.Errorf("no primary wal: %w", err)
|
|
} else if err != nil {
|
|
return fmt.Errorf("primary wal header: %w", err)
|
|
}
|
|
|
|
hdr1, err := readWALHeader(shadowWALPath)
|
|
if os.IsNotExist(err) {
|
|
return fmt.Errorf("no shadow wal")
|
|
} else if err != nil {
|
|
return fmt.Errorf("shadow wal header: %w", err)
|
|
}
|
|
|
|
if !bytes.Equal(hdr0, hdr1) {
|
|
return fmt.Errorf("wal header mismatch %x <> %x on %s", hdr0, hdr1, shadowWALPath)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// clean removes old generations & WAL files.
|
|
func (db *DB) clean() error {
|
|
if err := db.cleanGenerations(); err != nil {
|
|
return err
|
|
}
|
|
return db.cleanWAL()
|
|
}
|
|
|
|
// cleanGenerations removes old generations.
|
|
func (db *DB) cleanGenerations() error {
|
|
generation, err := db.CurrentGeneration()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dir := filepath.Join(db.MetaPath(), "generations")
|
|
fis, err := ioutil.ReadDir(dir)
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
for _, fi := range fis {
|
|
// Skip the current generation.
|
|
if filepath.Base(fi.Name()) == generation {
|
|
continue
|
|
}
|
|
|
|
// Delete all other generations.
|
|
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleanWAL removes WAL files that have been replicated.
|
|
func (db *DB) cleanWAL() error {
|
|
generation, err := db.CurrentGeneration()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Determine lowest index that's been replicated to all replicas.
|
|
min := -1
|
|
for _, r := range db.Replicas {
|
|
pos := r.LastPos()
|
|
if pos.Generation != generation {
|
|
pos = Pos{} // different generation, reset index to zero
|
|
}
|
|
if min == -1 || pos.Index < min {
|
|
min = pos.Index
|
|
}
|
|
}
|
|
|
|
// Skip if our lowest index is too small.
|
|
if min <= 0 {
|
|
return nil
|
|
}
|
|
min-- // Keep an extra WAL file.
|
|
|
|
// Remove all WAL files for the generation before the lowest index.
|
|
dir := db.ShadowWALDir(generation)
|
|
fis, err := ioutil.ReadDir(dir)
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
for _, fi := range fis {
|
|
if idx, _, _, err := ParseWALPath(fi.Name()); err != nil || idx >= min {
|
|
continue
|
|
}
|
|
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// acquireReadLock begins a read transaction on the database to prevent checkpointing.
|
|
func (db *DB) acquireReadLock() error {
|
|
if db.rtx != nil {
|
|
return nil
|
|
}
|
|
|
|
// Start long running read-transaction to prevent checkpoints.
|
|
tx, err := db.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Execute read query to obtain read lock.
|
|
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
|
|
// Track transaction so we can release it later before checkpoint.
|
|
db.rtx = tx
|
|
return nil
|
|
}
|
|
|
|
// releaseReadLock rolls back the long-running read transaction.
|
|
func (db *DB) releaseReadLock() error {
|
|
// Ignore if we do not have a read lock.
|
|
if db.rtx == nil {
|
|
return nil
|
|
}
|
|
|
|
// Rollback & clear read transaction.
|
|
err := db.rtx.Rollback()
|
|
db.rtx = nil
|
|
return err
|
|
}
|
|
|
|
// CurrentGeneration returns the name of the generation saved to the "generation"
|
|
// file in the meta data directory. Returns empty string if none exists.
|
|
func (db *DB) CurrentGeneration() (string, error) {
|
|
buf, err := ioutil.ReadFile(db.GenerationNamePath())
|
|
if os.IsNotExist(err) {
|
|
return "", nil
|
|
} else if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// TODO: Verify if generation directory exists. If not, delete name file.
|
|
|
|
generation := strings.TrimSpace(string(buf))
|
|
if len(generation) != GenerationNameLen {
|
|
return "", nil
|
|
}
|
|
return generation, nil
|
|
}
|
|
|
|
// createGeneration starts a new generation by creating the generation
|
|
// directory, snapshotting to each replica, and updating the current
|
|
// generation name.
|
|
func (db *DB) createGeneration() (string, error) {
|
|
// Generate random generation hex name.
|
|
buf := make([]byte, GenerationNameLen/2)
|
|
_, _ = rand.New(rand.NewSource(time.Now().UnixNano())).Read(buf)
|
|
generation := hex.EncodeToString(buf)
|
|
|
|
// Generate new directory.
|
|
dir := filepath.Join(db.MetaPath(), "generations", generation)
|
|
if err := mkdirAll(dir, db.dirmode, db.diruid, db.dirgid); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Initialize shadow WAL with copy of header.
|
|
if _, err := db.initShadowWALFile(db.ShadowWALPath(generation, 0)); err != nil {
|
|
return "", fmt.Errorf("initialize shadow wal: %w", err)
|
|
}
|
|
|
|
// Atomically write generation name as current generation.
|
|
generationNamePath := db.GenerationNamePath()
|
|
if err := ioutil.WriteFile(generationNamePath+".tmp", []byte(generation+"\n"), db.mode); err != nil {
|
|
return "", fmt.Errorf("write generation temp file: %w", err)
|
|
}
|
|
_ = os.Chown(generationNamePath+".tmp", db.uid, db.gid)
|
|
if err := os.Rename(generationNamePath+".tmp", generationNamePath); err != nil {
|
|
return "", fmt.Errorf("rename generation file: %w", err)
|
|
}
|
|
|
|
// Remove old generations.
|
|
if err := db.clean(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return generation, nil
|
|
}
|
|
|
|
// Sync copies pending data from the WAL to the shadow WAL.
|
|
func (db *DB) Sync(ctx context.Context) (err error) {
|
|
db.mu.Lock()
|
|
defer db.mu.Unlock()
|
|
|
|
// Initialize database, if necessary. Exit if no DB exists.
|
|
if err := db.init(); err != nil {
|
|
return err
|
|
} else if db.db == nil {
|
|
Tracef("%s: sync: no database found", db.path)
|
|
return nil
|
|
}
|
|
|
|
// Track total sync metrics.
|
|
t := time.Now()
|
|
defer func() {
|
|
db.syncNCounter.Inc()
|
|
if err != nil {
|
|
db.syncErrorNCounter.Inc()
|
|
}
|
|
db.syncSecondsCounter.Add(float64(time.Since(t).Seconds()))
|
|
}()
|
|
|
|
// Ensure WAL has at least one frame in it.
|
|
if err := db.ensureWALExists(); err != nil {
|
|
return fmt.Errorf("ensure wal exists: %w", err)
|
|
}
|
|
|
|
// Start a transaction. This will be promoted immediately after.
|
|
tx, err := db.db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("begin: %w", err)
|
|
}
|
|
|
|
// Ensure write transaction rolls back before returning.
|
|
defer func() {
|
|
if e := rollback(tx); e != nil && err == nil {
|
|
err = e
|
|
}
|
|
}()
|
|
|
|
// Insert into the lock table to promote to a write tx. The lock table
|
|
// insert will never actually occur because our tx will be rolled back,
|
|
// however, it will ensure our tx grabs the write lock. Unfortunately,
|
|
// we can't call "BEGIN IMMEDIATE" as we are already in a transaction.
|
|
if _, err := tx.ExecContext(ctx, `INSERT INTO _litestream_lock (id) VALUES (1);`); err != nil {
|
|
return fmt.Errorf("_litestream_lock: %w", err)
|
|
}
|
|
|
|
// Verify our last sync matches the current state of the WAL.
|
|
// This ensures that we have an existing generation & that the last sync
|
|
// position of the real WAL hasn't been overwritten by another process.
|
|
info, err := db.verify()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot verify wal state: %w", err)
|
|
}
|
|
Tracef("%s: sync: info=%#v", db.path, info)
|
|
|
|
// Track if anything in the shadow WAL changes and then notify at the end.
|
|
changed := info.walSize != info.shadowWALSize || info.restart || info.reason != ""
|
|
|
|
// If we are unable to verify the WAL state then we start a new generation.
|
|
if info.reason != "" {
|
|
// Start new generation & notify user via log message.
|
|
if info.generation, err = db.createGeneration(); err != nil {
|
|
return fmt.Errorf("create generation: %w", err)
|
|
}
|
|
log.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
|
|
|
|
// Clear shadow wal info.
|
|
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
|
|
info.shadowWALSize = WALHeaderSize
|
|
info.restart = false
|
|
info.reason = ""
|
|
|
|
}
|
|
|
|
// Synchronize real WAL with current shadow WAL.
|
|
newWALSize, err := db.syncWAL(info)
|
|
if err != nil {
|
|
return fmt.Errorf("sync wal: %w", err)
|
|
}
|
|
|
|
// If WAL size is great than max threshold, force checkpoint.
|
|
// If WAL size is greater than min threshold, attempt checkpoint.
|
|
var checkpoint bool
|
|
checkpointMode := CheckpointModePassive
|
|
if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
|
|
checkpoint, checkpointMode = true, CheckpointModeRestart
|
|
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
|
|
checkpoint = true
|
|
} else if db.CheckpointInterval > 0 && !info.dbModTime.IsZero() && time.Since(info.dbModTime) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) {
|
|
checkpoint = true
|
|
}
|
|
|
|
// Release write lock before checkpointing & exiting.
|
|
if err := tx.Rollback(); err != nil {
|
|
return fmt.Errorf("rollback write tx: %w", err)
|
|
}
|
|
|
|
// Issue the checkpoint.
|
|
if checkpoint {
|
|
changed = true
|
|
|
|
if err := db.checkpointAndInit(ctx, info.generation, checkpointMode); err != nil {
|
|
return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err)
|
|
}
|
|
}
|
|
|
|
// Clean up any old files.
|
|
if err := db.clean(); err != nil {
|
|
return fmt.Errorf("cannot clean: %w", err)
|
|
}
|
|
|
|
// Compute current index and total shadow WAL size.
|
|
// This is only for metrics so we ignore any errors that occur.
|
|
index, size, _ := db.CurrentShadowWALIndex(info.generation)
|
|
db.shadowWALIndexGauge.Set(float64(index))
|
|
db.shadowWALSizeGauge.Set(float64(size))
|
|
|
|
// Notify replicas of WAL changes.
|
|
if changed {
|
|
close(db.notify)
|
|
db.notify = make(chan struct{})
|
|
}
|
|
|
|
Tracef("%s: sync: ok", db.path)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ensureWALExists checks that the real WAL exists and has a header.
|
|
func (db *DB) ensureWALExists() (err error) {
|
|
// Exit early if WAL header exists.
|
|
if fi, err := os.Stat(db.WALPath()); err == nil && fi.Size() >= WALHeaderSize {
|
|
return nil
|
|
}
|
|
|
|
// Otherwise create transaction that updates the internal litestream table.
|
|
_, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`)
|
|
return err
|
|
}
|
|
|
|
// verify ensures the current shadow WAL state matches where it left off from
|
|
// the real WAL. Returns generation & WAL sync information. If info.reason is
|
|
// not blank, verification failed and a new generation should be started.
|
|
func (db *DB) verify() (info syncInfo, err error) {
|
|
// Look up existing generation.
|
|
generation, err := db.CurrentGeneration()
|
|
if err != nil {
|
|
return info, fmt.Errorf("cannot find current generation: %w", err)
|
|
} else if generation == "" {
|
|
info.reason = "no generation exists"
|
|
return info, nil
|
|
}
|
|
info.generation = generation
|
|
|
|
// Determine total bytes of real DB for metrics.
|
|
fi, err := os.Stat(db.Path())
|
|
if err != nil {
|
|
return info, err
|
|
}
|
|
info.dbModTime = fi.ModTime()
|
|
db.dbSizeGauge.Set(float64(fi.Size()))
|
|
|
|
// Determine total bytes of real WAL.
|
|
fi, err = os.Stat(db.WALPath())
|
|
if err != nil {
|
|
return info, err
|
|
}
|
|
info.walSize = frameAlign(fi.Size(), db.pageSize)
|
|
info.walModTime = fi.ModTime()
|
|
db.walSizeGauge.Set(float64(fi.Size()))
|
|
|
|
// Open shadow WAL to copy append to.
|
|
index, _, err := db.CurrentShadowWALIndex(info.generation)
|
|
if err != nil {
|
|
return info, fmt.Errorf("cannot determine shadow WAL index: %w", err)
|
|
} else if index >= MaxIndex {
|
|
info.reason = "max index exceeded"
|
|
return info, nil
|
|
}
|
|
info.shadowWALPath = db.ShadowWALPath(generation, index)
|
|
|
|
// Determine shadow WAL current size.
|
|
fi, err = os.Stat(info.shadowWALPath)
|
|
if os.IsNotExist(err) {
|
|
info.reason = "no shadow wal"
|
|
return info, nil
|
|
} else if err != nil {
|
|
return info, err
|
|
}
|
|
info.shadowWALSize = frameAlign(fi.Size(), db.pageSize)
|
|
|
|
// Exit if shadow WAL does not contain a full header.
|
|
if info.shadowWALSize < WALHeaderSize {
|
|
info.reason = "short shadow wal"
|
|
return info, nil
|
|
}
|
|
|
|
// If shadow WAL is larger than real WAL then the WAL has been truncated
|
|
// so we cannot determine our last state.
|
|
if info.shadowWALSize > info.walSize {
|
|
info.reason = "wal truncated by another process"
|
|
return info, nil
|
|
}
|
|
|
|
// Compare WAL headers. Start a new shadow WAL if they are mismatched.
|
|
if hdr0, err := readWALHeader(db.WALPath()); err != nil {
|
|
return info, fmt.Errorf("cannot read wal header: %w", err)
|
|
} else if hdr1, err := readWALHeader(info.shadowWALPath); err != nil {
|
|
return info, fmt.Errorf("cannot read shadow wal header: %w", err)
|
|
} else if !bytes.Equal(hdr0, hdr1) {
|
|
info.restart = !bytes.Equal(hdr0, hdr1)
|
|
}
|
|
|
|
// If we only have a header then ensure header matches.
|
|
// Otherwise we need to start a new generation.
|
|
if info.shadowWALSize == WALHeaderSize && info.restart {
|
|
info.reason = "wal header only, mismatched"
|
|
return info, nil
|
|
}
|
|
|
|
// Verify last page synced still matches.
|
|
if info.shadowWALSize > WALHeaderSize {
|
|
offset := info.shadowWALSize - int64(db.pageSize+WALFrameHeaderSize)
|
|
if buf0, err := readWALFileAt(db.WALPath(), offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil {
|
|
return info, fmt.Errorf("cannot read last synced wal page: %w", err)
|
|
} else if buf1, err := readWALFileAt(info.shadowWALPath, offset, int64(db.pageSize+WALFrameHeaderSize)); err != nil {
|
|
return info, fmt.Errorf("cannot read last synced shadow wal page: %w", err)
|
|
} else if !bytes.Equal(buf0, buf1) {
|
|
info.reason = "wal overwritten by another process"
|
|
return info, nil
|
|
}
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
type syncInfo struct {
|
|
generation string // generation name
|
|
dbModTime time.Time // last modified date of real DB file
|
|
walSize int64 // size of real WAL file
|
|
walModTime time.Time // last modified date of real WAL file
|
|
shadowWALPath string // name of last shadow WAL file
|
|
shadowWALSize int64 // size of last shadow WAL file
|
|
restart bool // if true, real WAL header does not match shadow WAL
|
|
reason string // if non-blank, reason for sync failure
|
|
}
|
|
|
|
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
|
|
func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
|
|
// Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed.
|
|
newSize, err = db.copyToShadowWAL(info.shadowWALPath)
|
|
if err != nil {
|
|
return newSize, fmt.Errorf("cannot copy to shadow wal: %w", err)
|
|
} else if !info.restart {
|
|
return newSize, nil // If no restart required, exit.
|
|
}
|
|
|
|
// Parse index of current shadow WAL file.
|
|
dir, base := filepath.Split(info.shadowWALPath)
|
|
index, _, _, err := ParseWALPath(base)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
|
}
|
|
|
|
// Start a new shadow WAL file with next index.
|
|
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
|
|
newSize, err = db.initShadowWALFile(newShadowWALPath)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
|
}
|
|
return newSize, nil
|
|
}
|
|
|
|
func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
|
hdr, err := readWALHeader(db.WALPath())
|
|
if err != nil {
|
|
return 0, fmt.Errorf("read header: %w", err)
|
|
}
|
|
|
|
// Determine byte order for checksumming from header magic.
|
|
bo, err := headerByteOrder(hdr)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Verify checksum.
|
|
s0 := binary.BigEndian.Uint32(hdr[24:])
|
|
s1 := binary.BigEndian.Uint32(hdr[28:])
|
|
if v0, v1 := Checksum(bo, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 {
|
|
return 0, fmt.Errorf("invalid header checksum: (%x,%x) != (%x,%x)", v0, v1, s0, s1)
|
|
}
|
|
|
|
// Write header to new WAL shadow file.
|
|
if err := mkdirAll(filepath.Dir(filename), db.dirmode, db.diruid, db.dirgid); err != nil {
|
|
return 0, err
|
|
} else if err := ioutil.WriteFile(filename, hdr, db.mode); err != nil {
|
|
return 0, err
|
|
}
|
|
_ = os.Chown(filename, db.uid, db.gid)
|
|
|
|
// Copy as much shadow WAL as available.
|
|
newSize, err := db.copyToShadowWAL(filename)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err)
|
|
}
|
|
return newSize, nil
|
|
}
|
|
|
|
func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|
Tracef("%s: copy-shadow: %s", db.path, filename)
|
|
|
|
r, err := os.Open(db.WALPath())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer r.Close()
|
|
|
|
w, err := os.OpenFile(filename, os.O_RDWR, 0666)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer w.Close()
|
|
|
|
fi, err := w.Stat()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
origSize := frameAlign(fi.Size(), db.pageSize)
|
|
|
|
// Read shadow WAL header to determine byte order for checksum & salt.
|
|
hdr := make([]byte, WALHeaderSize)
|
|
if _, err := io.ReadFull(w, hdr); err != nil {
|
|
return 0, fmt.Errorf("read header: %w", err)
|
|
}
|
|
hsalt0 := binary.BigEndian.Uint32(hdr[16:])
|
|
hsalt1 := binary.BigEndian.Uint32(hdr[20:])
|
|
|
|
bo, err := headerByteOrder(hdr)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Read previous checksum.
|
|
chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("last checksum: %w", err)
|
|
}
|
|
|
|
// Seek to correct position on real wal.
|
|
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
|
return 0, fmt.Errorf("real wal seek: %w", err)
|
|
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
|
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
|
}
|
|
|
|
// Read through WAL from last position to find the page of the last
|
|
// committed transaction.
|
|
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
|
|
var buf bytes.Buffer
|
|
offset := origSize
|
|
lastCommitSize := origSize
|
|
for {
|
|
// Read next page from WAL file.
|
|
if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err)
|
|
break // end of file or partial page
|
|
} else if err != nil {
|
|
return 0, fmt.Errorf("read wal: %w", err)
|
|
}
|
|
|
|
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
|
salt0 := binary.BigEndian.Uint32(frame[8:])
|
|
salt1 := binary.BigEndian.Uint32(frame[12:])
|
|
if salt0 != hsalt0 || salt1 != hsalt1 {
|
|
Tracef("%s: copy-shadow: break: salt mismatch", db.path)
|
|
break
|
|
}
|
|
|
|
// Verify checksum of page is valid.
|
|
fchksum0 := binary.BigEndian.Uint32(frame[16:])
|
|
fchksum1 := binary.BigEndian.Uint32(frame[20:])
|
|
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
|
|
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
|
|
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
|
log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", offset, chksum0, chksum1, fchksum0, fchksum1)
|
|
break
|
|
}
|
|
|
|
// Add page to the new size of the shadow WAL.
|
|
buf.Write(frame)
|
|
|
|
Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
|
|
offset += int64(len(frame))
|
|
|
|
// Flush to shadow WAL if commit record.
|
|
newDBSize := binary.BigEndian.Uint32(frame[4:])
|
|
if newDBSize != 0 {
|
|
if _, err := buf.WriteTo(w); err != nil {
|
|
return 0, fmt.Errorf("write shadow wal: %w", err)
|
|
}
|
|
buf.Reset()
|
|
lastCommitSize = offset
|
|
}
|
|
}
|
|
|
|
// Sync & close.
|
|
if err := w.Sync(); err != nil {
|
|
return 0, err
|
|
} else if err := w.Close(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Track total number of bytes written to WAL.
|
|
db.totalWALBytesCounter.Add(float64(lastCommitSize - origSize))
|
|
|
|
return lastCommitSize, nil
|
|
}
|
|
|
|
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
|
|
// If the reader is at the end of the file, it attempts to return the next file.
|
|
//
|
|
// The caller should check Pos() & Size() on the returned reader to check offset.
|
|
func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
|
|
// Fetch reader for the requested position. Return if it has data.
|
|
r, err = db.shadowWALReader(pos)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if r.N() > 0 {
|
|
return r, nil
|
|
} else if err := r.Close(); err != nil { // no data, close, try next
|
|
return nil, err
|
|
}
|
|
|
|
// Otherwise attempt to read the start of the next WAL file.
|
|
pos.Index, pos.Offset = pos.Index+1, 0
|
|
|
|
r, err = db.shadowWALReader(pos)
|
|
if os.IsNotExist(err) {
|
|
return nil, io.EOF
|
|
}
|
|
return r, err
|
|
}
|
|
|
|
// shadowWALReader opens a file reader for a shadow WAL file at a given position.
|
|
func (db *DB) shadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
|
|
filename := db.ShadowWALPath(pos.Generation, pos.Index)
|
|
|
|
f, err := os.Open(filename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ensure file is closed if any error occurs.
|
|
defer func() {
|
|
if err != nil {
|
|
r.Close()
|
|
}
|
|
}()
|
|
|
|
// Fetch frame-aligned file size and ensure requested offset is not past EOF.
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fileSize := frameAlign(fi.Size(), db.pageSize)
|
|
if pos.Offset > fileSize {
|
|
return nil, fmt.Errorf("wal reader offset too high: %d > %d", pos.Offset, fi.Size())
|
|
}
|
|
|
|
// Move file handle to offset position.
|
|
if _, err := f.Seek(pos.Offset, io.SeekStart); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ShadowWALReader{
|
|
f: f,
|
|
n: fileSize - pos.Offset,
|
|
pos: pos,
|
|
}, nil
|
|
}
|
|
|
|
// frameAlign returns a frame-aligned offset.
|
|
// Returns zero if offset is less than the WAL header size.
|
|
func frameAlign(offset int64, pageSize int) int64 {
|
|
assert(offset >= 0, "frameAlign(): offset must be non-negative")
|
|
assert(pageSize >= 0, "frameAlign(): page size must be non-negative")
|
|
|
|
if offset < WALHeaderSize {
|
|
return 0
|
|
}
|
|
|
|
frameSize := WALFrameHeaderSize + int64(pageSize)
|
|
frameN := (offset - WALHeaderSize) / frameSize
|
|
return (frameN * frameSize) + WALHeaderSize
|
|
}
|
|
|
|
// ShadowWALReader represents a reader for a shadow WAL file that tracks WAL position.
|
|
type ShadowWALReader struct {
|
|
f *os.File
|
|
n int64
|
|
pos Pos
|
|
}
|
|
|
|
// Name returns the filename of the underlying file.
|
|
func (r *ShadowWALReader) Name() string { return r.f.Name() }
|
|
|
|
// Close closes the underlying WAL file handle.
|
|
func (r *ShadowWALReader) Close() error { return r.f.Close() }
|
|
|
|
// N returns the remaining bytes in the reader.
|
|
func (r *ShadowWALReader) N() int64 { return r.n }
|
|
|
|
// Pos returns the current WAL position.
|
|
func (r *ShadowWALReader) Pos() Pos { return r.pos }
|
|
|
|
// Read reads bytes into p, updates the position, and returns the bytes read.
|
|
// Returns io.EOF at the end of the available section of the WAL.
|
|
func (r *ShadowWALReader) Read(p []byte) (n int, err error) {
|
|
if r.n <= 0 {
|
|
return 0, io.EOF
|
|
}
|
|
if int64(len(p)) > r.n {
|
|
p = p[0:r.n]
|
|
}
|
|
n, err = r.f.Read(p)
|
|
r.n -= int64(n)
|
|
r.pos.Offset += int64(n)
|
|
return n, err
|
|
}
|
|
|
|
// SQLite WAL constants
|
|
const (
|
|
WALHeaderChecksumOffset = 24
|
|
WALFrameHeaderChecksumOffset = 16
|
|
)
|
|
|
|
func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
|
|
// Determine the byte offset of the checksum for the header (if no pages
|
|
// exist) or for the last page (if at least one page exists).
|
|
offset := int64(WALHeaderChecksumOffset)
|
|
if fi, err := f.Stat(); err != nil {
|
|
return 0, 0, err
|
|
} else if sz := frameAlign(fi.Size(), pageSize); fi.Size() > WALHeaderSize {
|
|
offset = sz - int64(pageSize) - WALFrameHeaderSize + WALFrameHeaderChecksumOffset
|
|
}
|
|
|
|
// Read big endian checksum.
|
|
b := make([]byte, 8)
|
|
if n, err := f.ReadAt(b, offset); err != nil {
|
|
return 0, 0, err
|
|
} else if n != len(b) {
|
|
return 0, 0, io.ErrUnexpectedEOF
|
|
}
|
|
return binary.BigEndian.Uint32(b[0:]), binary.BigEndian.Uint32(b[4:]), nil
|
|
}
|
|
|
|
// Checkpoint performs a checkpoint on the WAL file.
|
|
func (db *DB) Checkpoint(mode string) (err error) {
|
|
db.mu.Lock()
|
|
defer db.mu.Unlock()
|
|
return db.checkpoint(mode)
|
|
}
|
|
|
|
func (db *DB) checkpoint(mode string) (err error) {
|
|
// Ignore if there is no underlying database.
|
|
if db.db == nil {
|
|
return nil
|
|
}
|
|
|
|
// Track checkpoint metrics.
|
|
t := time.Now()
|
|
defer func() {
|
|
labels := prometheus.Labels{"mode": mode}
|
|
db.checkpointNCounterVec.With(labels).Inc()
|
|
if err != nil {
|
|
db.checkpointErrorNCounterVec.With(labels).Inc()
|
|
}
|
|
db.checkpointSecondsCounterVec.With(labels).Add(float64(time.Since(t).Seconds()))
|
|
}()
|
|
|
|
// Ensure the read lock has been removed before issuing a checkpoint.
|
|
// We defer the re-acquire to ensure it occurs even on an early return.
|
|
if err := db.releaseReadLock(); err != nil {
|
|
return fmt.Errorf("release read lock: %w", err)
|
|
}
|
|
defer func() { _ = db.acquireReadLock() }()
|
|
|
|
// A non-forced checkpoint is issued as "PASSIVE". This will only checkpoint
|
|
// if there are not pending transactions. A forced checkpoint ("RESTART")
|
|
// will wait for pending transactions to end & block new transactions before
|
|
// forcing the checkpoint and restarting the WAL.
|
|
//
|
|
// See: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
|
|
rawsql := `PRAGMA wal_checkpoint(` + mode + `);`
|
|
|
|
var row [3]int
|
|
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
|
|
return err
|
|
}
|
|
Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
|
|
|
|
// Reacquire the read lock immediately after the checkpoint.
|
|
if err := db.acquireReadLock(); err != nil {
|
|
return fmt.Errorf("release read lock: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkpointAndInit performs a checkpoint on the WAL file and initializes a
|
|
// new shadow WAL file.
|
|
func (db *DB) checkpointAndInit(ctx context.Context, generation, mode string) error {
|
|
shadowWALPath, err := db.CurrentShadowWALPath(generation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Read WAL header before checkpoint to check if it has been restarted.
|
|
hdr, err := readWALHeader(db.WALPath())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Copy shadow WAL before checkpoint to copy as much as possible.
|
|
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
|
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
|
|
}
|
|
|
|
// Execute checkpoint and immediately issue a write to the WAL to ensure
|
|
// a new page is written.
|
|
if err := db.checkpoint(mode); err != nil {
|
|
return err
|
|
} else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If WAL hasn't been restarted, exit.
|
|
if other, err := readWALHeader(db.WALPath()); err != nil {
|
|
return err
|
|
} else if bytes.Equal(hdr, other) {
|
|
return nil
|
|
}
|
|
|
|
// Start a transaction. This will be promoted immediately after.
|
|
tx, err := db.db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("begin: %w", err)
|
|
}
|
|
defer func() { _ = rollback(tx) }()
|
|
|
|
// Insert into the lock table to promote to a write tx. The lock table
|
|
// insert will never actually occur because our tx will be rolled back,
|
|
// however, it will ensure our tx grabs the write lock. Unfortunately,
|
|
// we can't call "BEGIN IMMEDIATE" as we are already in a transaction.
|
|
if _, err := tx.ExecContext(ctx, `INSERT INTO _litestream_lock (id) VALUES (1);`); err != nil {
|
|
return fmt.Errorf("_litestream_lock: %w", err)
|
|
}
|
|
|
|
// Copy the end of the previous WAL before starting a new shadow WAL.
|
|
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
|
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
|
|
}
|
|
|
|
// Parse index of current shadow WAL file.
|
|
index, _, _, err := ParseWALPath(shadowWALPath)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath)
|
|
}
|
|
|
|
// Start a new shadow WAL file with next index.
|
|
newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1))
|
|
if _, err := db.initShadowWALFile(newShadowWALPath); err != nil {
|
|
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
|
}
|
|
|
|
// Release write lock before checkpointing & exiting.
|
|
if err := tx.Rollback(); err != nil {
|
|
return fmt.Errorf("rollback post-checkpoint tx: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// monitor runs in a separate goroutine and monitors the database & WAL.
|
|
func (db *DB) monitor() {
|
|
ticker := time.NewTicker(db.MonitorInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
// Wait for ticker or context close.
|
|
select {
|
|
case <-db.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
|
|
// Sync the database to the shadow WAL.
|
|
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
log.Printf("%s: sync error: %s", db.path, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// RestoreReplica restores the database from a replica based on the options given.
|
|
// This method will restore into opt.OutputPath, if specified, or into the
|
|
// DB's original database path. It can optionally restore from a specific
|
|
// replica or generation or it will automatically choose the best one. Finally,
|
|
// a timestamp can be specified to restore the database to a specific
|
|
// point-in-time.
|
|
func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err error) {
|
|
// Validate options.
|
|
if opt.OutputPath == "" {
|
|
return fmt.Errorf("output path required")
|
|
} else if opt.Generation == "" && opt.Index != math.MaxInt32 {
|
|
return fmt.Errorf("must specify generation when restoring to index")
|
|
} else if opt.Index != math.MaxInt32 && !opt.Timestamp.IsZero() {
|
|
return fmt.Errorf("cannot specify index & timestamp to restore")
|
|
}
|
|
|
|
// Ensure logger exists.
|
|
logger := opt.Logger
|
|
if logger == nil {
|
|
logger = log.New(ioutil.Discard, "", 0)
|
|
}
|
|
|
|
logPrefix := r.Name()
|
|
if db := r.DB(); db != nil {
|
|
logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name())
|
|
}
|
|
|
|
// Ensure output path does not already exist.
|
|
if _, err := os.Stat(opt.OutputPath); err == nil {
|
|
return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
|
|
} else if err != nil && !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
|
|
// Find lastest snapshot that occurs before timestamp or index.
|
|
var minWALIndex int
|
|
if opt.Index < math.MaxInt32 {
|
|
if minWALIndex, err = SnapshotIndexByIndex(ctx, r, opt.Generation, opt.Index); err != nil {
|
|
return fmt.Errorf("cannot find snapshot index: %w", err)
|
|
}
|
|
} else {
|
|
if minWALIndex, err = SnapshotIndexAt(ctx, r, opt.Generation, opt.Timestamp); err != nil {
|
|
return fmt.Errorf("cannot find snapshot index by timestamp: %w", err)
|
|
}
|
|
}
|
|
|
|
// Find the maximum WAL index that occurs before timestamp.
|
|
maxWALIndex, err := WALIndexAt(ctx, r, opt.Generation, opt.Index, opt.Timestamp)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot find max wal index for restore: %w", err)
|
|
}
|
|
logger.Printf("%s: starting restore: generation %s, index %08x-%08x", logPrefix, opt.Generation, minWALIndex, maxWALIndex)
|
|
|
|
// Initialize starting position.
|
|
pos := Pos{Generation: opt.Generation, Index: minWALIndex}
|
|
tmpPath := opt.OutputPath + ".tmp"
|
|
|
|
// Copy snapshot to output path.
|
|
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
|
|
if err := restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
|
|
return fmt.Errorf("cannot restore snapshot: %w", err)
|
|
}
|
|
|
|
// Fill input channel with all WAL indexes to be loaded in order.
|
|
ch := make(chan int, maxWALIndex-minWALIndex+1)
|
|
for index := minWALIndex; index <= maxWALIndex; index++ {
|
|
ch <- index
|
|
}
|
|
close(ch)
|
|
|
|
// Track load state for each WAL.
|
|
var mu sync.Mutex
|
|
cond := sync.NewCond(&mu)
|
|
ready := make([]bool, maxWALIndex-minWALIndex+1)
|
|
|
|
parallelism := opt.Parallelism
|
|
if parallelism < 1 {
|
|
parallelism = 1
|
|
}
|
|
|
|
// Download WAL files to disk in parallel.
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
for i := 0; i < parallelism; i++ {
|
|
g.Go(func() error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
cond.Broadcast()
|
|
return err
|
|
case index, ok := <-ch:
|
|
if !ok {
|
|
cond.Broadcast()
|
|
return nil
|
|
}
|
|
|
|
startTime := time.Now()
|
|
if err = downloadWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
|
|
logger.Printf("%s: no wal available, snapshot only", logPrefix)
|
|
continue // snapshot file only, ignore error
|
|
} else if err != nil {
|
|
cond.Broadcast()
|
|
return fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err)
|
|
}
|
|
|
|
// Mark index as ready-to-apply and notify applying code.
|
|
mu.Lock()
|
|
ready[index-minWALIndex] = true
|
|
mu.Unlock()
|
|
cond.Broadcast()
|
|
|
|
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
|
|
logPrefix, opt.Generation, index,
|
|
time.Since(startTime).String(),
|
|
)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Apply WAL files in order as they are ready.
|
|
for index := minWALIndex; index <= maxWALIndex; index++ {
|
|
// Wait until next WAL file is ready to apply.
|
|
mu.Lock()
|
|
for !ready[index-minWALIndex] {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
cond.Wait()
|
|
}
|
|
mu.Unlock()
|
|
|
|
// Apply WAL to database file.
|
|
startTime := time.Now()
|
|
if err = applyWAL(ctx, index, tmpPath); err != nil {
|
|
return fmt.Errorf("cannot apply wal: %w", err)
|
|
}
|
|
logger.Printf("%s: applied wal %s/%08x elapsed=%s",
|
|
logPrefix, opt.Generation, index,
|
|
time.Since(startTime).String(),
|
|
)
|
|
}
|
|
|
|
// Copy file to final location.
|
|
logger.Printf("%s: renaming database from temporary location", logPrefix)
|
|
if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CalcRestoreTarget returns a replica & generation to restore from based on opt criteria.
|
|
func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (Replica, string, error) {
|
|
var target struct {
|
|
replica Replica
|
|
generation string
|
|
stats GenerationStats
|
|
}
|
|
|
|
for _, r := range db.Replicas {
|
|
// Skip replica if it does not match filter.
|
|
if opt.ReplicaName != "" && r.Name() != opt.ReplicaName {
|
|
continue
|
|
}
|
|
|
|
generation, stats, err := CalcReplicaRestoreTarget(ctx, r, opt)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
|
|
// Use the latest replica if we have multiple candidates.
|
|
if !stats.UpdatedAt.After(target.stats.UpdatedAt) {
|
|
continue
|
|
}
|
|
|
|
target.replica, target.generation, target.stats = r, generation, stats
|
|
}
|
|
return target.replica, target.generation, nil
|
|
}
|
|
|
|
// CalcReplicaRestoreTarget returns a generation to restore from.
|
|
func CalcReplicaRestoreTarget(ctx context.Context, r Replica, opt RestoreOptions) (generation string, stats GenerationStats, err error) {
|
|
var target struct {
|
|
generation string
|
|
stats GenerationStats
|
|
}
|
|
|
|
generations, err := r.Generations(ctx)
|
|
if err != nil {
|
|
return "", stats, fmt.Errorf("cannot fetch generations: %w", err)
|
|
}
|
|
|
|
// Search generations for one that contains the requested timestamp.
|
|
for _, generation := range generations {
|
|
// Skip generation if it does not match filter.
|
|
if opt.Generation != "" && generation != opt.Generation {
|
|
continue
|
|
}
|
|
|
|
// Fetch stats for generation.
|
|
stats, err := r.GenerationStats(ctx, generation)
|
|
if err != nil {
|
|
return "", stats, fmt.Errorf("cannot determine stats for generation (%s/%s): %s", r.Name(), generation, err)
|
|
}
|
|
|
|
// Skip if it does not contain timestamp.
|
|
if !opt.Timestamp.IsZero() {
|
|
if opt.Timestamp.Before(stats.CreatedAt) || opt.Timestamp.After(stats.UpdatedAt) {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Use the latest replica if we have multiple candidates.
|
|
if !stats.UpdatedAt.After(target.stats.UpdatedAt) {
|
|
continue
|
|
}
|
|
|
|
target.generation = generation
|
|
target.stats = stats
|
|
}
|
|
|
|
return target.generation, target.stats, nil
|
|
}
|
|
|
|
// restoreSnapshot copies a snapshot from the replica to a file.
|
|
func restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error {
|
|
// Determine the user/group & mode based on the DB, if available.
|
|
uid, gid, mode := -1, -1, os.FileMode(0600)
|
|
diruid, dirgid, dirmode := -1, -1, os.FileMode(0700)
|
|
if db := r.DB(); db != nil {
|
|
uid, gid, mode = db.uid, db.gid, db.mode
|
|
diruid, dirgid, dirmode = db.diruid, db.dirgid, db.dirmode
|
|
}
|
|
|
|
if err := mkdirAll(filepath.Dir(filename), dirmode, diruid, dirgid); err != nil {
|
|
return err
|
|
}
|
|
|
|
f, err := createFile(filename, mode, uid, gid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
rd, err := r.SnapshotReader(ctx, generation, index)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rd.Close()
|
|
|
|
if _, err := io.Copy(f, rd); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := f.Sync(); err != nil {
|
|
return err
|
|
}
|
|
return f.Close()
|
|
}
|
|
|
|
// downloadWAL copies a WAL file from the replica to a local copy next to the DB.
|
|
// The WAL is later applied by applyWAL(). This function can be run in parallel
|
|
// to download multiple WAL files simultaneously.
|
|
func downloadWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
|
|
// Determine the user/group & mode based on the DB, if available.
|
|
uid, gid, mode := -1, -1, os.FileMode(0600)
|
|
if db := r.DB(); db != nil {
|
|
uid, gid, mode = db.uid, db.gid, db.mode
|
|
}
|
|
|
|
// Open WAL file from replica.
|
|
rd, err := r.WALReader(ctx, generation, index)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rd.Close()
|
|
|
|
// Open handle to destination WAL path.
|
|
f, err := createFile(fmt.Sprintf("%s-%08x-wal", dbPath, index), mode, uid, gid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Copy WAL to target path.
|
|
if _, err := io.Copy(f, rd); err != nil {
|
|
return err
|
|
} else if err := f.Close(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// applyWAL performs a truncating checkpoint on the given database.
|
|
func applyWAL(ctx context.Context, index int, dbPath string) error {
|
|
// Copy WAL file from it's staging path to the correct "-wal" location.
|
|
if err := os.Rename(fmt.Sprintf("%s-%08x-wal", dbPath, index), dbPath+"-wal"); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Open SQLite database and force a truncating checkpoint.
|
|
d, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer d.Close()
|
|
|
|
var row [3]int
|
|
if err := d.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE);`).Scan(&row[0], &row[1], &row[2]); err != nil {
|
|
return err
|
|
} else if row[0] != 0 {
|
|
return fmt.Errorf("truncation checkpoint failed during restore (%d,%d,%d)", row[0], row[1], row[2])
|
|
}
|
|
return d.Close()
|
|
}
|
|
|
|
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
|
|
//
|
|
// This function obtains a read lock so it prevents syncs from occurring until
|
|
// the operation is complete. The database will still be usable but it will be
|
|
// unable to checkpoint during this time.
|
|
//
|
|
// If dst is set, the database file is copied to that location before checksum.
|
|
func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
|
|
db.mu.Lock()
|
|
defer db.mu.Unlock()
|
|
|
|
if err := db.init(); err != nil {
|
|
return 0, Pos{}, err
|
|
} else if db.db == nil {
|
|
return 0, Pos{}, os.ErrNotExist
|
|
}
|
|
|
|
generation, err := db.CurrentGeneration()
|
|
if err != nil {
|
|
return 0, Pos{}, fmt.Errorf("cannot find current generation: %w", err)
|
|
} else if generation == "" {
|
|
return 0, Pos{}, fmt.Errorf("no current generation")
|
|
}
|
|
|
|
// Force a RESTART checkpoint to ensure the database is at the start of the WAL.
|
|
if err := db.checkpointAndInit(ctx, generation, CheckpointModeRestart); err != nil {
|
|
return 0, Pos{}, err
|
|
}
|
|
|
|
// Obtain current position. Clear the offset since we are only reading the
|
|
// DB and not applying the current WAL.
|
|
pos, err := db.Pos()
|
|
if err != nil {
|
|
return 0, pos, err
|
|
}
|
|
pos.Offset = 0
|
|
|
|
// Seek to the beginning of the db file descriptor and checksum whole file.
|
|
h := crc64.New(crc64.MakeTable(crc64.ISO))
|
|
if _, err := db.f.Seek(0, io.SeekStart); err != nil {
|
|
return 0, pos, err
|
|
} else if _, err := io.Copy(h, db.f); err != nil {
|
|
return 0, pos, err
|
|
}
|
|
return h.Sum64(), pos, nil
|
|
}
|
|
|
|
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
|
|
const DefaultRestoreParallelism = 8
|
|
|
|
// RestoreOptions represents options for DB.Restore().
|
|
type RestoreOptions struct {
|
|
// Target path to restore into.
|
|
// If blank, the original DB path is used.
|
|
OutputPath string
|
|
|
|
// Specific replica to restore from.
|
|
// If blank, all replicas are considered.
|
|
ReplicaName string
|
|
|
|
// Specific generation to restore from.
|
|
// If blank, all generations considered.
|
|
Generation string
|
|
|
|
// Specific index to restore from.
|
|
// Set to math.MaxInt32 to ignore index.
|
|
Index int
|
|
|
|
// Point-in-time to restore database.
|
|
// If zero, database restore to most recent state available.
|
|
Timestamp time.Time
|
|
|
|
// Specifies how many WAL files are downloaded in parallel during restore.
|
|
Parallelism int
|
|
|
|
// Logging settings.
|
|
Logger *log.Logger
|
|
Verbose bool
|
|
}
|
|
|
|
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
|
|
func NewRestoreOptions() RestoreOptions {
|
|
return RestoreOptions{
|
|
Index: math.MaxInt32,
|
|
Parallelism: DefaultRestoreParallelism,
|
|
}
|
|
}
|
|
|
|
// Database metrics.
|
|
var (
|
|
dbSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "db_size",
|
|
Help: "The current size of the real DB",
|
|
}, []string{"db"})
|
|
|
|
walSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "wal_size",
|
|
Help: "The current size of the real WAL",
|
|
}, []string{"db"})
|
|
|
|
totalWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "total_wal_bytes",
|
|
Help: "Total number of bytes written to shadow WAL",
|
|
}, []string{"db"})
|
|
|
|
shadowWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "shadow_wal_index",
|
|
Help: "The current index of the shadow WAL",
|
|
}, []string{"db"})
|
|
|
|
shadowWALSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "shadow_wal_size",
|
|
Help: "Current size of shadow WAL, in bytes",
|
|
}, []string{"db"})
|
|
|
|
syncNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "sync_count",
|
|
Help: "Number of sync operations performed",
|
|
}, []string{"db"})
|
|
|
|
syncErrorNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "sync_error_count",
|
|
Help: "Number of sync errors that have occurred",
|
|
}, []string{"db"})
|
|
|
|
syncSecondsCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "sync_seconds",
|
|
Help: "Time spent syncing shadow WAL, in seconds",
|
|
}, []string{"db"})
|
|
|
|
checkpointNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "checkpoint_count",
|
|
Help: "Number of checkpoint operations performed",
|
|
}, []string{"db", "mode"})
|
|
|
|
checkpointErrorNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "checkpoint_error_count",
|
|
Help: "Number of checkpoint errors that have occurred",
|
|
}, []string{"db", "mode"})
|
|
|
|
checkpointSecondsCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "litestream",
|
|
Subsystem: "db",
|
|
Name: "checkpoint_seconds",
|
|
Help: "Time spent checkpointing WAL, in seconds",
|
|
}, []string{"db", "mode"})
|
|
)
|
|
|
|
func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
|
|
magic := binary.BigEndian.Uint32(hdr[0:])
|
|
switch magic {
|
|
case 0x377f0682:
|
|
return binary.LittleEndian, nil
|
|
case 0x377f0683:
|
|
return binary.BigEndian, nil
|
|
default:
|
|
return nil, fmt.Errorf("invalid wal header magic: %x", magic)
|
|
}
|
|
}
|