Download WAL files in parallel during restore

This commit changes the restore to download multiple WAL files to
the local disk in parallel while another goroutine applies those
files in order. Downloading & applying the WAL files in serial
reduces the total throughput as WAL files are typically made up of
multiple small files.
This commit is contained in:
Ben Johnson
2021-04-21 16:07:29 -06:00
parent 257b625749
commit 03831e2d06
4 changed files with 99 additions and 23 deletions

View File

@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"strconv"
"time" "time"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
@@ -26,6 +27,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.StringVar(&opt.Generation, "generation", "", "generation name") fs.StringVar(&opt.Generation, "generation", "", "generation name")
fs.Var((*indexVar)(&opt.Index), "index", "wal index") fs.Var((*indexVar)(&opt.Index), "index", "wal index")
fs.IntVar(&opt.Parallelism, "parallelism", opt.Parallelism, "parallelism")
ifReplicaExists := fs.Bool("if-replica-exists", false, "") ifReplicaExists := fs.Bool("if-replica-exists", false, "")
timestampStr := fs.String("timestamp", "", "timestamp") timestampStr := fs.String("timestamp", "", "timestamp")
verbose := fs.Bool("v", false, "verbose output") verbose := fs.Bool("v", false, "verbose output")
@@ -170,6 +172,10 @@ Arguments:
-if-replica-exists -if-replica-exists
Returns exit code of 0 if no backups found. Returns exit code of 0 if no backups found.
-parallelism NUM
Determines the number of WAL files downloaded in parallel.
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
-v -v
Verbose output. Verbose output.

103
db.go
View File

@@ -23,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
) )
// Default DB settings. // Default DB settings.
@@ -1491,30 +1492,82 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err err
return fmt.Errorf("cannot restore snapshot: %w", err) return fmt.Errorf("cannot restore snapshot: %w", err)
} }
// Restore each WAL file until we reach our maximum index. // Fill input channel with all WAL indexes to be loaded in order.
ch := make(chan int, maxWALIndex-minWALIndex+1)
for index := minWALIndex; index <= maxWALIndex; index++ { for index := minWALIndex; index <= maxWALIndex; index++ {
restoreStartTime := time.Now() ch <- index
if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { }
close(ch)
// Track load state for each WAL.
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := make([]bool, maxWALIndex-minWALIndex+1)
parallelism := opt.Parallelism
if parallelism < 1 {
parallelism = 1
}
// Download WAL files to disk in parallel.
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < parallelism; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
cond.Broadcast()
return err
case index, ok := <-ch:
if !ok {
cond.Broadcast()
return nil
}
startTime := time.Now()
if err = downloadWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
logger.Printf("%s: no wal available, snapshot only", logPrefix) logger.Printf("%s: no wal available, snapshot only", logPrefix)
break // snapshot file only, ignore error continue // snapshot file only, ignore error
} else if err != nil { } else if err != nil {
return fmt.Errorf("cannot restore wal: %w", err) cond.Broadcast()
return fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err)
} }
fi, err := os.Stat(tmpPath + "-wal") // Mark index as ready-to-apply and notify applying code.
if err != nil { mu.Lock()
return fmt.Errorf("cannot stat restored wal: %w", err) ready[index-minWALIndex] = true
mu.Unlock()
cond.Broadcast()
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
time.Since(startTime).String(),
)
}
}
})
} }
applyStartTime := time.Now() // Apply WAL files in order as they are ready.
if err = applyWAL(ctx, tmpPath); err != nil { for index := minWALIndex; index <= maxWALIndex; index++ {
// Wait until next WAL file is ready to apply.
mu.Lock()
for !ready[index-minWALIndex] {
if err := ctx.Err(); err != nil {
return err
}
cond.Wait()
}
mu.Unlock()
// Apply WAL to database file.
startTime := time.Now()
if err = applyWAL(ctx, index, tmpPath); err != nil {
return fmt.Errorf("cannot apply wal: %w", err) return fmt.Errorf("cannot apply wal: %w", err)
} }
logger.Printf("%s: restored wal %s/%08x (sz=%d restore=%s apply=%s)", logger.Printf("%s: applied wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index, logPrefix, opt.Generation, index,
fi.Size(), time.Since(startTime).String(),
applyStartTime.Sub(restoreStartTime).String(),
time.Since(applyStartTime).String(),
) )
} }
@@ -1636,8 +1689,10 @@ func restoreSnapshot(ctx context.Context, r Replica, generation string, index in
return f.Close() return f.Close()
} }
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. // downloadWAL copies a WAL file from the replica to a local copy next to the DB.
func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { // The WAL is later applied by applyWAL(). This function can be run in parallel
// to download multiple WAL files simultaneously.
func downloadWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
// Determine the user/group & mode based on the DB, if available. // Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600) uid, gid, mode := -1, -1, os.FileMode(0600)
if db := r.DB(); db != nil { if db := r.DB(); db != nil {
@@ -1652,7 +1707,7 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
defer rd.Close() defer rd.Close()
// Open handle to destination WAL path. // Open handle to destination WAL path.
f, err := createFile(dbPath+"-wal", mode, uid, gid) f, err := createFile(fmt.Sprintf("%s-%08x-wal", dbPath, index), mode, uid, gid)
if err != nil { if err != nil {
return err return err
} }
@@ -1668,7 +1723,12 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
} }
// applyWAL performs a truncating checkpoint on the given database. // applyWAL performs a truncating checkpoint on the given database.
func applyWAL(ctx context.Context, dbPath string) error { func applyWAL(ctx context.Context, index int, dbPath string) error {
// Copy WAL file from it's staging path to the correct "-wal" location.
if err := os.Rename(fmt.Sprintf("%s-%08x-wal", dbPath, index), dbPath+"-wal"); err != nil {
return err
}
// Open SQLite database and force a truncating checkpoint. // Open SQLite database and force a truncating checkpoint.
d, err := sql.Open("sqlite3", dbPath) d, err := sql.Open("sqlite3", dbPath)
if err != nil { if err != nil {
@@ -1732,6 +1792,9 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
return h.Sum64(), pos, nil return h.Sum64(), pos, nil
} }
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const DefaultRestoreParallelism = 8
// RestoreOptions represents options for DB.Restore(). // RestoreOptions represents options for DB.Restore().
type RestoreOptions struct { type RestoreOptions struct {
// Target path to restore into. // Target path to restore into.
@@ -1754,6 +1817,9 @@ type RestoreOptions struct {
// If zero, database restore to most recent state available. // If zero, database restore to most recent state available.
Timestamp time.Time Timestamp time.Time
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
// Logging settings. // Logging settings.
Logger *log.Logger Logger *log.Logger
Verbose bool Verbose bool
@@ -1763,6 +1829,7 @@ type RestoreOptions struct {
func NewRestoreOptions() RestoreOptions { func NewRestoreOptions() RestoreOptions {
return RestoreOptions{ return RestoreOptions{
Index: math.MaxInt32, Index: math.MaxInt32,
Parallelism: DefaultRestoreParallelism,
} }
} }

1
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.5 github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3 github.com/pierrec/lz4/v4 v4.1.3
github.com/prometheus/client_golang v1.9.0 github.com/prometheus/client_golang v1.9.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
) )

2
go.sum
View File

@@ -316,6 +316,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=