diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 79312cb..4fa78e7 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "os" + "strconv" "time" "github.com/benbjohnson/litestream" @@ -26,6 +27,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") fs.StringVar(&opt.Generation, "generation", "", "generation name") fs.Var((*indexVar)(&opt.Index), "index", "wal index") + fs.IntVar(&opt.Parallelism, "parallelism", opt.Parallelism, "parallelism") ifReplicaExists := fs.Bool("if-replica-exists", false, "") timestampStr := fs.String("timestamp", "", "timestamp") verbose := fs.Bool("v", false, "verbose output") @@ -170,6 +172,10 @@ Arguments: -if-replica-exists Returns exit code of 0 if no backups found. + -parallelism NUM + Determines the number of WAL files downloaded in parallel. + Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`. + -v Verbose output. diff --git a/db.go b/db.go index 2e2da6b..1fea632 100644 --- a/db.go +++ b/db.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" ) // Default DB settings. @@ -1491,30 +1492,82 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err err return fmt.Errorf("cannot restore snapshot: %w", err) } - // Restore each WAL file until we reach our maximum index. + // Fill input channel with all WAL indexes to be loaded in order. + ch := make(chan int, maxWALIndex-minWALIndex+1) for index := minWALIndex; index <= maxWALIndex; index++ { - restoreStartTime := time.Now() - if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { - logger.Printf("%s: no wal available, snapshot only", logPrefix) - break // snapshot file only, ignore error - } else if err != nil { - return fmt.Errorf("cannot restore wal: %w", err) - } + ch <- index + } + close(ch) - fi, err := os.Stat(tmpPath + "-wal") - if err != nil { - return fmt.Errorf("cannot stat restored wal: %w", err) - } + // Track load state for each WAL. + var mu sync.Mutex + cond := sync.NewCond(&mu) + ready := make([]bool, maxWALIndex-minWALIndex+1) - applyStartTime := time.Now() - if err = applyWAL(ctx, tmpPath); err != nil { + parallelism := opt.Parallelism + if parallelism < 1 { + parallelism = 1 + } + + // Download WAL files to disk in parallel. + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < parallelism; i++ { + g.Go(func() error { + for { + select { + case <-ctx.Done(): + cond.Broadcast() + return err + case index, ok := <-ch: + if !ok { + cond.Broadcast() + return nil + } + + startTime := time.Now() + if err = downloadWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { + logger.Printf("%s: no wal available, snapshot only", logPrefix) + continue // snapshot file only, ignore error + } else if err != nil { + cond.Broadcast() + return fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err) + } + + // Mark index as ready-to-apply and notify applying code. + mu.Lock() + ready[index-minWALIndex] = true + mu.Unlock() + cond.Broadcast() + + logger.Printf("%s: downloaded wal %s/%08x elapsed=%s", + logPrefix, opt.Generation, index, + time.Since(startTime).String(), + ) + } + } + }) + } + + // Apply WAL files in order as they are ready. + for index := minWALIndex; index <= maxWALIndex; index++ { + // Wait until next WAL file is ready to apply. + mu.Lock() + for !ready[index-minWALIndex] { + if err := ctx.Err(); err != nil { + return err + } + cond.Wait() + } + mu.Unlock() + + // Apply WAL to database file. + startTime := time.Now() + if err = applyWAL(ctx, index, tmpPath); err != nil { return fmt.Errorf("cannot apply wal: %w", err) } - logger.Printf("%s: restored wal %s/%08x (sz=%d restore=%s apply=%s)", + logger.Printf("%s: applied wal %s/%08x elapsed=%s", logPrefix, opt.Generation, index, - fi.Size(), - applyStartTime.Sub(restoreStartTime).String(), - time.Since(applyStartTime).String(), + time.Since(startTime).String(), ) } @@ -1636,8 +1689,10 @@ func restoreSnapshot(ctx context.Context, r Replica, generation string, index in return f.Close() } -// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. -func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { +// downloadWAL copies a WAL file from the replica to a local copy next to the DB. +// The WAL is later applied by applyWAL(). This function can be run in parallel +// to download multiple WAL files simultaneously. +func downloadWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { // Determine the user/group & mode based on the DB, if available. uid, gid, mode := -1, -1, os.FileMode(0600) if db := r.DB(); db != nil { @@ -1652,7 +1707,7 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db defer rd.Close() // Open handle to destination WAL path. - f, err := createFile(dbPath+"-wal", mode, uid, gid) + f, err := createFile(fmt.Sprintf("%s-%08x-wal", dbPath, index), mode, uid, gid) if err != nil { return err } @@ -1668,7 +1723,12 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db } // applyWAL performs a truncating checkpoint on the given database. -func applyWAL(ctx context.Context, dbPath string) error { +func applyWAL(ctx context.Context, index int, dbPath string) error { + // Copy WAL file from it's staging path to the correct "-wal" location. + if err := os.Rename(fmt.Sprintf("%s-%08x-wal", dbPath, index), dbPath+"-wal"); err != nil { + return err + } + // Open SQLite database and force a truncating checkpoint. d, err := sql.Open("sqlite3", dbPath) if err != nil { @@ -1732,6 +1792,9 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) { return h.Sum64(), pos, nil } +// DefaultRestoreParallelism is the default parallelism when downloading WAL files. +const DefaultRestoreParallelism = 8 + // RestoreOptions represents options for DB.Restore(). type RestoreOptions struct { // Target path to restore into. @@ -1754,6 +1817,9 @@ type RestoreOptions struct { // If zero, database restore to most recent state available. Timestamp time.Time + // Specifies how many WAL files are downloaded in parallel during restore. + Parallelism int + // Logging settings. Logger *log.Logger Verbose bool @@ -1762,7 +1828,8 @@ type RestoreOptions struct { // NewRestoreOptions returns a new instance of RestoreOptions with defaults. func NewRestoreOptions() RestoreOptions { return RestoreOptions{ - Index: math.MaxInt32, + Index: math.MaxInt32, + Parallelism: DefaultRestoreParallelism, } } diff --git a/go.mod b/go.mod index 90c25b7..8c21364 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.5 github.com/pierrec/lz4/v4 v4.1.3 github.com/prometheus/client_golang v1.9.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 3793bf7..9136fc2 100644 --- a/go.sum +++ b/go.sum @@ -316,6 +316,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=