Merge pull request #167 from benbjohnson/parallel-restore

Download WAL files in parallel during restore
This commit is contained in:
Ben Johnson
2021-04-21 16:27:09 -06:00
committed by GitHub
4 changed files with 99 additions and 23 deletions

View File

@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"strconv"
"time" "time"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
@@ -26,6 +27,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name") fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
fs.StringVar(&opt.Generation, "generation", "", "generation name") fs.StringVar(&opt.Generation, "generation", "", "generation name")
fs.Var((*indexVar)(&opt.Index), "index", "wal index") fs.Var((*indexVar)(&opt.Index), "index", "wal index")
fs.IntVar(&opt.Parallelism, "parallelism", opt.Parallelism, "parallelism")
ifReplicaExists := fs.Bool("if-replica-exists", false, "") ifReplicaExists := fs.Bool("if-replica-exists", false, "")
timestampStr := fs.String("timestamp", "", "timestamp") timestampStr := fs.String("timestamp", "", "timestamp")
verbose := fs.Bool("v", false, "verbose output") verbose := fs.Bool("v", false, "verbose output")
@@ -170,6 +172,10 @@ Arguments:
-if-replica-exists -if-replica-exists
Returns exit code of 0 if no backups found. Returns exit code of 0 if no backups found.
-parallelism NUM
Determines the number of WAL files downloaded in parallel.
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
-v -v
Verbose output. Verbose output.

113
db.go
View File

@@ -23,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
) )
// Default DB settings. // Default DB settings.
@@ -1491,30 +1492,82 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) (err err
return fmt.Errorf("cannot restore snapshot: %w", err) return fmt.Errorf("cannot restore snapshot: %w", err)
} }
// Restore each WAL file until we reach our maximum index. // Fill input channel with all WAL indexes to be loaded in order.
ch := make(chan int, maxWALIndex-minWALIndex+1)
for index := minWALIndex; index <= maxWALIndex; index++ { for index := minWALIndex; index <= maxWALIndex; index++ {
restoreStartTime := time.Now() ch <- index
if err = restoreWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex { }
logger.Printf("%s: no wal available, snapshot only", logPrefix) close(ch)
break // snapshot file only, ignore error
} else if err != nil {
return fmt.Errorf("cannot restore wal: %w", err)
}
fi, err := os.Stat(tmpPath + "-wal") // Track load state for each WAL.
if err != nil { var mu sync.Mutex
return fmt.Errorf("cannot stat restored wal: %w", err) cond := sync.NewCond(&mu)
} ready := make([]bool, maxWALIndex-minWALIndex+1)
applyStartTime := time.Now() parallelism := opt.Parallelism
if err = applyWAL(ctx, tmpPath); err != nil { if parallelism < 1 {
parallelism = 1
}
// Download WAL files to disk in parallel.
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < parallelism; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
cond.Broadcast()
return err
case index, ok := <-ch:
if !ok {
cond.Broadcast()
return nil
}
startTime := time.Now()
if err = downloadWAL(ctx, r, opt.Generation, index, tmpPath); os.IsNotExist(err) && index == minWALIndex && index == maxWALIndex {
logger.Printf("%s: no wal available, snapshot only", logPrefix)
continue // snapshot file only, ignore error
} else if err != nil {
cond.Broadcast()
return fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err)
}
// Mark index as ready-to-apply and notify applying code.
mu.Lock()
ready[index-minWALIndex] = true
mu.Unlock()
cond.Broadcast()
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index,
time.Since(startTime).String(),
)
}
}
})
}
// Apply WAL files in order as they are ready.
for index := minWALIndex; index <= maxWALIndex; index++ {
// Wait until next WAL file is ready to apply.
mu.Lock()
for !ready[index-minWALIndex] {
if err := ctx.Err(); err != nil {
return err
}
cond.Wait()
}
mu.Unlock()
// Apply WAL to database file.
startTime := time.Now()
if err = applyWAL(ctx, index, tmpPath); err != nil {
return fmt.Errorf("cannot apply wal: %w", err) return fmt.Errorf("cannot apply wal: %w", err)
} }
logger.Printf("%s: restored wal %s/%08x (sz=%d restore=%s apply=%s)", logger.Printf("%s: applied wal %s/%08x elapsed=%s",
logPrefix, opt.Generation, index, logPrefix, opt.Generation, index,
fi.Size(), time.Since(startTime).String(),
applyStartTime.Sub(restoreStartTime).String(),
time.Since(applyStartTime).String(),
) )
} }
@@ -1636,8 +1689,10 @@ func restoreSnapshot(ctx context.Context, r Replica, generation string, index in
return f.Close() return f.Close()
} }
// restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. // downloadWAL copies a WAL file from the replica to a local copy next to the DB.
func restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { // The WAL is later applied by applyWAL(). This function can be run in parallel
// to download multiple WAL files simultaneously.
func downloadWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error {
// Determine the user/group & mode based on the DB, if available. // Determine the user/group & mode based on the DB, if available.
uid, gid, mode := -1, -1, os.FileMode(0600) uid, gid, mode := -1, -1, os.FileMode(0600)
if db := r.DB(); db != nil { if db := r.DB(); db != nil {
@@ -1652,7 +1707,7 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
defer rd.Close() defer rd.Close()
// Open handle to destination WAL path. // Open handle to destination WAL path.
f, err := createFile(dbPath+"-wal", mode, uid, gid) f, err := createFile(fmt.Sprintf("%s-%08x-wal", dbPath, index), mode, uid, gid)
if err != nil { if err != nil {
return err return err
} }
@@ -1668,7 +1723,12 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
} }
// applyWAL performs a truncating checkpoint on the given database. // applyWAL performs a truncating checkpoint on the given database.
func applyWAL(ctx context.Context, dbPath string) error { func applyWAL(ctx context.Context, index int, dbPath string) error {
// Copy WAL file from it's staging path to the correct "-wal" location.
if err := os.Rename(fmt.Sprintf("%s-%08x-wal", dbPath, index), dbPath+"-wal"); err != nil {
return err
}
// Open SQLite database and force a truncating checkpoint. // Open SQLite database and force a truncating checkpoint.
d, err := sql.Open("sqlite3", dbPath) d, err := sql.Open("sqlite3", dbPath)
if err != nil { if err != nil {
@@ -1732,6 +1792,9 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
return h.Sum64(), pos, nil return h.Sum64(), pos, nil
} }
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const DefaultRestoreParallelism = 8
// RestoreOptions represents options for DB.Restore(). // RestoreOptions represents options for DB.Restore().
type RestoreOptions struct { type RestoreOptions struct {
// Target path to restore into. // Target path to restore into.
@@ -1754,6 +1817,9 @@ type RestoreOptions struct {
// If zero, database restore to most recent state available. // If zero, database restore to most recent state available.
Timestamp time.Time Timestamp time.Time
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
// Logging settings. // Logging settings.
Logger *log.Logger Logger *log.Logger
Verbose bool Verbose bool
@@ -1762,7 +1828,8 @@ type RestoreOptions struct {
// NewRestoreOptions returns a new instance of RestoreOptions with defaults. // NewRestoreOptions returns a new instance of RestoreOptions with defaults.
func NewRestoreOptions() RestoreOptions { func NewRestoreOptions() RestoreOptions {
return RestoreOptions{ return RestoreOptions{
Index: math.MaxInt32, Index: math.MaxInt32,
Parallelism: DefaultRestoreParallelism,
} }
} }

1
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.5 github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3 github.com/pierrec/lz4/v4 v4.1.3
github.com/prometheus/client_golang v1.9.0 github.com/prometheus/client_golang v1.9.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
) )

2
go.sum
View File

@@ -316,6 +316,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=