|
|
|
@@ -23,6 +23,7 @@ import (
|
|
|
|
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Default DB settings.
|
|
|
|
@@ -1491,30 +1492,82 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err err
|
|
|
|
|
return fmt.Errorf("cannot restore snapshot: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Restore each WAL file until we reach our maximum index.
|
|
|
|
|
// Fill input channel with all WAL indexes to be loaded in order.
|
|
|
|
|
ch := make(chan int, maxWALIndex-minWALIndex+1)
|
|
|
|
|
for index := minWALIndex; index <= maxWALIndex; index++ {
|
|
|
|
|
restoreStartTime := time.Now()
|
|
|
|
|
if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
|
|
|
|
|
ch <- index
|
|
|
|
|
}
|
|
|
|
|
close(ch)
|
|
|
|
|
|
|
|
|
|
// Track load state for each WAL.
|
|
|
|
|
var mu sync.Mutex
|
|
|
|
|
cond := sync.NewCond(&mu)
|
|
|
|
|
ready := make([]bool, maxWALIndex-minWALIndex+1)
|
|
|
|
|
|
|
|
|
|
parallelism := opt.Parallelism
|
|
|
|
|
if parallelism < 1 {
|
|
|
|
|
parallelism = 1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Download WAL files to disk in parallel.
|
|
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
|
|
|
for i := 0; i < parallelism; i++ {
|
|
|
|
|
g.Go(func() error {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
cond.Broadcast()
|
|
|
|
|
return err
|
|
|
|
|
case index, ok := <-ch:
|
|
|
|
|
if !ok {
|
|
|
|
|
cond.Broadcast()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
if err = downloadWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
|
|
|
|
|
logger.Printf("%s: no wal available, snapshot only", logPrefix)
|
|
|
|
|
break // snapshot file only, ignore error
|
|
|
|
|
continue // snapshot file only, ignore error
|
|
|
|
|
} else if err != nil {
|
|
|
|
|
return fmt.Errorf("cannot restore wal: %w", err)
|
|
|
|
|
cond.Broadcast()
|
|
|
|
|
return fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fi, err := os.Stat(tmpPath + "-wal")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("cannot stat restored wal: %w", err)
|
|
|
|
|
// Mark index as ready-to-apply and notify applying code.
|
|
|
|
|
mu.Lock()
|
|
|
|
|
ready[index-minWALIndex] = true
|
|
|
|
|
mu.Unlock()
|
|
|
|
|
cond.Broadcast()
|
|
|
|
|
|
|
|
|
|
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
|
|
|
|
|
logPrefix, opt.Generation, index,
|
|
|
|
|
time.Since(startTime).String(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
applyStartTime := time.Now()
|
|
|
|
|
if err = applyWAL(ctx, tmpPath); err != nil {
|
|
|
|
|
// Apply WAL files in order as they are ready.
|
|
|
|
|
for index := minWALIndex; index <= maxWALIndex; index++ {
|
|
|
|
|
// Wait until next WAL file is ready to apply.
|
|
|
|
|
mu.Lock()
|
|
|
|
|
for !ready[index-minWALIndex] {
|
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
cond.Wait()
|
|
|
|
|
}
|
|
|
|
|
mu.Unlock()
|
|
|
|
|
|
|
|
|
|
// Apply WAL to database file.
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
if err = applyWAL(ctx, index, tmpPath); err != nil {
|
|
|
|
|
return fmt.Errorf("cannot apply wal: %w", err)
|
|
|
|
|
}
|
|
|
|
|
logger.Printf("%s: restored wal %s/%08x (sz=%d restore=%s apply=%s)",
|
|
|
|
|
logger.Printf("%s: applied wal %s/%08x elapsed=%s",
|
|
|
|
|
logPrefix, opt.Generation, index,
|
|
|
|
|
fi.Size(),
|
|
|
|
|
applyStartTime.Sub(restoreStartTime).String(),
|
|
|
|
|
time.Since(applyStartTime).String(),
|
|
|
|
|
time.Since(startTime).String(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -1636,8 +1689,10 @@ func restoreSnapshot(ctx context.Context, r Replica, generation string, index in
|
|
|
|
|
return f.Close()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
|
|
|
|
|
func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
|
|
|
|
|
// downloadWAL copies a WAL file from the replica to a local copy next to the DB.
|
|
|
|
|
// The WAL is later applied by applyWAL(). This function can be run in parallel
|
|
|
|
|
// to download multiple WAL files simultaneously.
|
|
|
|
|
func downloadWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
|
|
|
|
|
// Determine the user/group & mode based on the DB, if available.
|
|
|
|
|
uid, gid, mode := -1, -1, os.FileMode(0600)
|
|
|
|
|
if db := r.DB(); db != nil {
|
|
|
|
@@ -1652,7 +1707,7 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
|
|
|
|
|
defer rd.Close()
|
|
|
|
|
|
|
|
|
|
// Open handle to destination WAL path.
|
|
|
|
|
f, err := createFile(dbPath+"-wal", mode, uid, gid)
|
|
|
|
|
f, err := createFile(fmt.Sprintf("%s-%08x-wal", dbPath, index), mode, uid, gid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@@ -1668,7 +1723,12 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// applyWAL performs a truncating checkpoint on the given database.
|
|
|
|
|
func applyWAL(ctx context.Context, dbPath string) error {
|
|
|
|
|
func applyWAL(ctx context.Context, index int, dbPath string) error {
|
|
|
|
|
// Copy WAL file from it's staging path to the correct "-wal" location.
|
|
|
|
|
if err := os.Rename(fmt.Sprintf("%s-%08x-wal", dbPath, index), dbPath+"-wal"); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Open SQLite database and force a truncating checkpoint.
|
|
|
|
|
d, err := sql.Open("sqlite3", dbPath)
|
|
|
|
|
if err != nil {
|
|
|
|
@@ -1732,6 +1792,9 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
|
|
|
|
|
return h.Sum64(), pos, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
|
|
|
|
|
const DefaultRestoreParallelism = 8
|
|
|
|
|
|
|
|
|
|
// RestoreOptions represents options for DB.Restore().
|
|
|
|
|
type RestoreOptions struct {
|
|
|
|
|
// Target path to restore into.
|
|
|
|
@@ -1754,6 +1817,9 @@ type RestoreOptions struct {
|
|
|
|
|
// If zero, database restore to most recent state available.
|
|
|
|
|
Timestamp time.Time
|
|
|
|
|
|
|
|
|
|
// Specifies how many WAL files are downloaded in parallel during restore.
|
|
|
|
|
Parallelism int
|
|
|
|
|
|
|
|
|
|
// Logging settings.
|
|
|
|
|
Logger *log.Logger
|
|
|
|
|
Verbose bool
|
|
|
|
@@ -1763,6 +1829,7 @@ type RestoreOptions struct {
|
|
|
|
|
func NewRestoreOptions() RestoreOptions {
|
|
|
|
|
return RestoreOptions{
|
|
|
|
|
Index: math.MaxInt32,
|
|
|
|
|
Parallelism: DefaultRestoreParallelism,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|