From 03831e2d06bb62a086687f905cc3368242bbb205 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 21 Apr 2021 16:07:29 -0600 Subject: [PATCH] Download WAL files in parallel during restore This commit changes the restore to download multiple WAL files to the local disk in parallel while another goroutine applies those files in order. Downloading & applying the WAL files in serial reduces the total throughput as WAL files are typically made up of multiple small files. --- cmd/litestream/restore.go | 6 ++ db.go | 113 ++++++++++++++++++++++++++++++-------- go.mod | 1 + go.sum | 2 + 4 files changed, 99 insertions(+), 23 deletions(-) diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 79312cb..4fa78e7 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "os" + "strconv" "time" "github.com/benbjohnson/litestream" @@ -26,6 +27,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") fs.StringVar(&opt.Generation, "generation", "", "generation name") fs.Var((*indexVar)(&opt.Index), "index", "wal index") + fs.IntVar(&opt.Parallelism, "parallelism", opt.Parallelism, "parallelism") ifReplicaExists := fs.Bool("if-replica-exists", false, "") timestampStr := fs.String("timestamp", "", "timestamp") verbose := fs.Bool("v", false, "verbose output") @@ -170,6 +172,10 @@ Arguments: -if-replica-exists Returns exit code of 0 if no backups found. + -parallelism NUM + Determines the number of WAL files downloaded in parallel. + Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`. + -v Verbose output. diff --git a/db.go b/db.go index 2e2da6b..1fea632 100644 --- a/db.go +++ b/db.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" ) // Default DB settings. @@ -1491,30 +1492,82 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err err return fmt.Errorf("cannot restore snapshot: %w", err) } - // Restore each WAL file until we reach our maximum index. + // Fill input channel with all WAL indexes to be loaded in order. + ch := make(chan int, maxWALIndex-minWALIndex+1) for index := minWALIndex; index <= maxWALIndex; index++ { - restoreStartTime := time.Now() - if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { - logger.Printf("%s: no wal available, snapshot only", logPrefix) - break // snapshot file only, ignore error - } else if err != nil { - return fmt.Errorf("cannot restore wal: %w", err) - } + ch <- index + } + close(ch) - fi, err := os.Stat(tmpPath + "-wal") - if err != nil { - return fmt.Errorf("cannot stat restored wal: %w", err) - } + // Track load state for each WAL. + var mu sync.Mutex + cond := sync.NewCond(&mu) + ready := make([]bool, maxWALIndex-minWALIndex+1) - applyStartTime := time.Now() - if err = applyWAL(ctx, tmpPath); err != nil { + parallelism := opt.Parallelism + if parallelism < 1 { + parallelism = 1 + } + + // Download WAL files to disk in parallel. + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < parallelism; i++ { + g.Go(func() error { + for { + select { + case <-ctx.Done(): + cond.Broadcast() + return err + case index, ok := <-ch: + if !ok { + cond.Broadcast() + return nil + } + + startTime := time.Now() + if err = downloadWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { + logger.Printf("%s: no wal available, snapshot only", logPrefix) + continue // snapshot file only, ignore error + } else if err != nil { + cond.Broadcast() + return fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err) + } + + // Mark index as ready-to-apply and notify applying code. + mu.Lock() + ready[index-minWALIndex] = true + mu.Unlock() + cond.Broadcast() + + logger.Printf("%s: downloaded wal %s/%08x elapsed=%s", + logPrefix, opt.Generation, index, + time.Since(startTime).String(), + ) + } + } + }) + } + + // Apply WAL files in order as they are ready. + for index := minWALIndex; index <= maxWALIndex; index++ { + // Wait until next WAL file is ready to apply. + mu.Lock() + for !ready[index-minWALIndex] { + if err := ctx.Err(); err != nil { + return err + } + cond.Wait() + } + mu.Unlock() + + // Apply WAL to database file. + startTime := time.Now() + if err = applyWAL(ctx, index, tmpPath); err != nil { return fmt.Errorf("cannot apply wal: %w", err) } - logger.Printf("%s: restored wal %s/%08x (sz=%d restore=%s apply=%s)", + logger.Printf("%s: applied wal %s/%08x elapsed=%s", logPrefix, opt.Generation, index, - fi.Size(), - applyStartTime.Sub(restoreStartTime).String(), - time.Since(applyStartTime).String(), + time.Since(startTime).String(), ) } @@ -1636,8 +1689,10 @@ func restoreSnapshot(ctx context.Context, r Replica, generation string, index in return f.Close() } -// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. -func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { +// downloadWAL copies a WAL file from the replica to a local copy next to the DB. +// The WAL is later applied by applyWAL(). This function can be run in parallel +// to download multiple WAL files simultaneously. +func downloadWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { // Determine the user/group & mode based on the DB, if available. uid, gid, mode := -1, -1, os.FileMode(0600) if db := r.DB(); db != nil { @@ -1652,7 +1707,7 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db defer rd.Close() // Open handle to destination WAL path. - f, err := createFile(dbPath+"-wal", mode, uid, gid) + f, err := createFile(fmt.Sprintf("%s-%08x-wal", dbPath, index), mode, uid, gid) if err != nil { return err } @@ -1668,7 +1723,12 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db } // applyWAL performs a truncating checkpoint on the given database. -func applyWAL(ctx context.Context, dbPath string) error { +func applyWAL(ctx context.Context, index int, dbPath string) error { + // Copy WAL file from it's staging path to the correct "-wal" location. + if err := os.Rename(fmt.Sprintf("%s-%08x-wal", dbPath, index), dbPath+"-wal"); err != nil { + return err + } + // Open SQLite database and force a truncating checkpoint. d, err := sql.Open("sqlite3", dbPath) if err != nil { @@ -1732,6 +1792,9 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) { return h.Sum64(), pos, nil } +// DefaultRestoreParallelism is the default parallelism when downloading WAL files. +const DefaultRestoreParallelism = 8 + // RestoreOptions represents options for DB.Restore(). type RestoreOptions struct { // Target path to restore into. @@ -1754,6 +1817,9 @@ type RestoreOptions struct { // If zero, database restore to most recent state available. Timestamp time.Time + // Specifies how many WAL files are downloaded in parallel during restore. + Parallelism int + // Logging settings. Logger *log.Logger Verbose bool @@ -1762,7 +1828,8 @@ type RestoreOptions struct { // NewRestoreOptions returns a new instance of RestoreOptions with defaults. func NewRestoreOptions() RestoreOptions { return RestoreOptions{ - Index: math.MaxInt32, + Index: math.MaxInt32, + Parallelism: DefaultRestoreParallelism, } } diff --git a/go.mod b/go.mod index 90c25b7..8c21364 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.5 github.com/pierrec/lz4/v4 v4.1.3 github.com/prometheus/client_golang v1.9.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 3793bf7..9136fc2 100644 --- a/go.sum +++ b/go.sum @@ -316,6 +316,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=