Download WAL files in parallel during restore

This commit changes the restore to download multiple WAL files to
the local disk in parallel while another goroutine applies those
files in order. Downloading & applying the WAL files in serial
reduces the total throughput as WAL files are typically made up of
multiple small files.
This commit is contained in:
Ben Johnson
2021-04-21 16:07:29 -06:00
parent 257b625749
commit 03831e2d06
4 changed files with 99 additions and 23 deletions

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/benbjohnson/litestream"
@@ -26,6 +27,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.StringVar(&opt.Generation, "generation", "", "generation name")
fs.Var((*indexVar)(&opt.Index), "index", "wal index")
fs.IntVar(&opt.Parallelism, "parallelism", opt.Parallelism, "parallelism")
ifReplicaExists := fs.Bool("if-replica-exists", false, "")
timestampStr := fs.String("timestamp", "", "timestamp")
verbose := fs.Bool("v", false, "verbose output")
@@ -170,6 +172,10 @@ Arguments:
-if-replica-exists
Returns exit code of 0 if no backups found.
-parallelism NUM
Determines the number of WAL files downloaded in parallel.
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
-v
Verbose output.

113
db.go
View File

@@ -23,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
)
// Default DB settings.
@@ -1491,30 +1492,82 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err err
return fmt.Errorf("cannot restore snapshot: %w", err)
}
// Restore each WAL file until we reach our maximum index.
// Fill input channel with all WAL indexes to be loaded in order.
ch := make(chan int, maxWALIndex-minWALIndex+1)
for index := minWALIndex; index <= maxWALIndex; index++ {
restoreStartTime := time.Now()
if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
logger.Printf("%s: no wal available, snapshot only", logPrefix)
break // snapshot file only, ignore error
} else if err != nil {
return fmt.Errorf("cannot restore wal: %w", err)
}
ch <- index
}
close(ch)
fi, err := os.Stat(tmpPath + "-wal")
if err != nil {
return fmt.Errorf("cannot stat restored wal: %w", err)
}
// Track load state for each WAL.
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := make([]bool, maxWALIndex-minWALIndex+1)
applyStartTime := time.Now()
if err = applyWAL(ctx, tmpPath); err != nil {
parallelism := opt.Parallelism
if parallelism < 1 {
parallelism = 1
}
// Download WAL files to disk in parallel.
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < parallelism; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
cond.Broadcast()
return err
case index, ok := <-ch:
if !ok {
cond.Broadcast()
return nil
}
startTime := time.Now()
if err = downloadWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
logger.Printf("%s: no wal available, snapshot only", logPrefix)
continue // snapshot file only, ignore error
} else if err != nil {
cond.Broadcast()
return fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err)
}
// Mark index as ready-to-apply and notify applying code.
mu.Lock()
ready[index-minWALIndex] = true
mu.Unlock()
cond.Broadcast()
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
time.Since(startTime).String(),
)
}
}
})
}
// Apply WAL files in order as they are ready.
for index := minWALIndex; index <= maxWALIndex; index++ {
// Wait until next WAL file is ready to apply.
mu.Lock()
for !ready[index-minWALIndex] {
if err := ctx.Err(); err != nil {
return err
}
cond.Wait()
}
mu.Unlock()
// Apply WAL to database file.
startTime := time.Now()
if err = applyWAL(ctx, index, tmpPath); err != nil {
return fmt.Errorf("cannot apply wal: %w", err)
}
logger.Printf("%s: restored wal %s/%08x (sz=%d restore=%s apply=%s)",
logger.Printf("%s: applied wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
fi.Size(),
applyStartTime.Sub(restoreStartTime).String(),
time.Since(applyStartTime).String(),
time.Since(startTime).String(),
)
}
@@ -1636,8 +1689,10 @@ func restoreSnapshot(ctx context.Context, r Replica, generation string, index in
return f.Close()
}
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint.
func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
// downloadWAL copies a WAL file from the replica to a local copy next to the DB.
// The WAL is later applied by applyWAL(). This function can be run in parallel
// to download multiple WAL files simultaneously.
func downloadWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
// Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600)
if db := r.DB(); db != nil {
@@ -1652,7 +1707,7 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
defer rd.Close()
// Open handle to destination WAL path.
f, err := createFile(dbPath+"-wal", mode, uid, gid)
f, err := createFile(fmt.Sprintf("%s-%08x-wal", dbPath, index), mode, uid, gid)
if err != nil {
return err
}
@@ -1668,7 +1723,12 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
}
// applyWAL performs a truncating checkpoint on the given database.
func applyWAL(ctx context.Context, dbPath string) error {
func applyWAL(ctx context.Context, index int, dbPath string) error {
// Copy WAL file from it's staging path to the correct "-wal" location.
if err := os.Rename(fmt.Sprintf("%s-%08x-wal", dbPath, index), dbPath+"-wal"); err != nil {
return err
}
// Open SQLite database and force a truncating checkpoint.
d, err := sql.Open("sqlite3", dbPath)
if err != nil {
@@ -1732,6 +1792,9 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
return h.Sum64(), pos, nil
}
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const DefaultRestoreParallelism = 8
// RestoreOptions represents options for DB.Restore().
type RestoreOptions struct {
// Target path to restore into.
@@ -1754,6 +1817,9 @@ type RestoreOptions struct {
// If zero, database restore to most recent state available.
Timestamp time.Time
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
// Logging settings.
Logger *log.Logger
Verbose bool
@@ -1762,7 +1828,8 @@ type RestoreOptions struct {
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
func NewRestoreOptions() RestoreOptions {
return RestoreOptions{
Index: math.MaxInt32,
Index: math.MaxInt32,
Parallelism: DefaultRestoreParallelism,
}
}

1
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3
github.com/prometheus/client_golang v1.9.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e
gopkg.in/yaml.v2 v2.4.0
)

2
go.sum
View File

@@ -316,6 +316,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=