From 358dcd4650a0516b3b8e62a4cacf1c1cef4a8e3a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 18 Jan 2021 10:01:16 -0700 Subject: [PATCH] Copy shadow WAL immediately after init --- db.go | 118 +++++++++++++++++++++++++++++++++++++---------------- replica.go | 17 +++----- 2 files changed, 89 insertions(+), 46 deletions(-) diff --git a/db.go b/db.go index 4c155f7..35eba7c 100644 --- a/db.go +++ b/db.go @@ -589,7 +589,7 @@ func (db *DB) createGeneration() (string, error) { } // Initialize shadow WAL with copy of header. - if err := db.initShadowWALFile(db.ShadowWALPath(generation, 0)); err != nil { + if _, err := db.initShadowWALFile(db.ShadowWALPath(generation, 0)); err != nil { return "", fmt.Errorf("initialize shadow wal: %w", err) } @@ -713,7 +713,7 @@ func (db *DB) Sync() (err error) { if checkpoint { changed = true - if err := db.checkpointAndInit(info.shadowWALPath, checkpointMode); err != nil { + if err := db.checkpointAndInit(info.generation, checkpointMode); err != nil { return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err) } } @@ -875,42 +875,45 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) { // Start a new shadow WAL file with next index. newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1)) - if err := db.initShadowWALFile(newShadowWALPath); err != nil { - return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) - } - - // Copy rest of valid WAL to new shadow WAL. - newSize, err = db.copyToShadowWAL(newShadowWALPath) + newSize, err = db.initShadowWALFile(newShadowWALPath) if err != nil { - return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err) + return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } return newSize, nil } -func (db *DB) initShadowWALFile(filename string) error { +func (db *DB) initShadowWALFile(filename string) (int64, error) { hdr, err := readWALHeader(db.WALPath()) if err != nil { - return fmt.Errorf("read header: %w", err) + return 0, fmt.Errorf("read header: %w", err) } // Determine byte order for checksumming from header magic. bo, err := headerByteOrder(hdr) if err != nil { - return err + return 0, err } // Verify checksum. s0 := binary.BigEndian.Uint32(hdr[24:]) s1 := binary.BigEndian.Uint32(hdr[28:]) if v0, v1 := Checksum(bo, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 { - return fmt.Errorf("invalid header checksum: (%x,%x) != (%x,%x)", v0, v1, s0, s1) + return 0, fmt.Errorf("invalid header checksum: (%x,%x) != (%x,%x)", v0, v1, s0, s1) } // Write header to new WAL shadow file. if err := mkdirAll(filepath.Dir(filename), 0700, db.uid, db.gid); err != nil { - return err + return 0, err + } else if err := ioutil.WriteFile(filename, hdr, 0600); err != nil { + return 0, err } - return ioutil.WriteFile(filename, hdr, 0600) + + // Copy as much shadow WAL as available. + newSize, err := db.copyToShadowWAL(filename) + if err != nil { + return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err) + } + return newSize, nil } func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { @@ -1199,7 +1202,12 @@ func (db *DB) checkpoint(mode string) (err error) { // checkpointAndInit performs a checkpoint on the WAL file and initializes a // new shadow WAL file. -func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error { +func (db *DB) checkpointAndInit(generation, mode string) error { + shadowWALPath, err := db.CurrentShadowWALPath(generation) + if err != nil { + return err + } + // Read WAL header before checkpoint to check if it has been restarted. hdr, err := readWALHeader(db.WALPath()) if err != nil { @@ -1234,7 +1242,7 @@ func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error { // Start a new shadow WAL file with next index. newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1)) - if err := db.initShadowWALFile(newShadowWALPath); err != nil { + if _, err := db.initShadowWALFile(newShadowWALPath); err != nil { return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } @@ -1319,7 +1327,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { tmpPath := outputPath + ".tmp" // Copy snapshot to output path. - var dbChksum uint64 + var dbChksum, dbChksumAlt uint64 if !opt.DryRun { if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { return fmt.Errorf("cannot restore snapshot: %w", err) @@ -1327,8 +1335,11 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { if dbChksum, err = checksumFile(tmpPath); err != nil { return fmt.Errorf("cannot compute db checksum: %w", err) } + if dbChksumAlt, err = checksumFileAt(tmpPath, int64(db.pageSize)); err != nil { + return fmt.Errorf("cannot compute alternate db checksum: %w", err) + } } - log.Printf("%s(%s): restoring snapshot %s/%08x to %s, checksum=%016x", db.path, r.Name(), generation, minWALIndex, tmpPath, dbChksum) + log.Printf("%s(%s): restoring snapshot %s/%08x to %s, checksum=%016x alt=%016x", db.path, r.Name(), generation, minWALIndex, tmpPath, dbChksum, dbChksumAlt) // Restore each WAL file until we reach our maximum index. var walChksum uint64 @@ -1373,6 +1384,24 @@ func checksumFile(filename string) (uint64, error) { return h.Sum64(), nil } +func checksumFileAt(filename string, offset int64) (uint64, error) { + f, err := os.Open(filename) + if err != nil { + return 0, err + } + defer f.Close() + + if _, err := f.Seek(offset, io.SeekStart); err != nil { + return 0, err + } + + h := crc64.New(crc64.MakeTable(crc64.ISO)) + if _, err := io.Copy(h, f); err != nil { + return 0, err + } + return h.Sum64(), nil +} + func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) { var target struct { replica Replica @@ -1507,42 +1536,61 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde // This function obtains a read lock so it prevents syncs from occuring until // the operation is complete. The database will still be usable but it will be // unable to checkpoint during this time. -func (db *DB) CRC64() (uint64, Pos, error) { +func (db *DB) CRC64() (uint64, uint64, Pos, error) { db.mu.Lock() defer db.mu.Unlock() if err := db.init(); err != nil { - return 0, Pos{}, err + return 0, 0, Pos{}, err } else if db.db == nil { - return 0, Pos{}, os.ErrNotExist + return 0, 0, Pos{}, os.ErrNotExist + } + + generation, err := db.CurrentGeneration() + if err != nil { + return 0, 0, Pos{}, fmt.Errorf("cannot find current generation: %w", err) + } else if generation == "" { + return 0, 0, Pos{}, fmt.Errorf("no current generation") } // Force a RESTART checkpoint to ensure the database is at the start of the WAL. - if err := db.checkpoint(CheckpointModeRestart); err != nil { - return 0, Pos{}, err + if err := db.checkpointAndInit(generation, CheckpointModeRestart); err != nil { + return 0, 0, Pos{}, err } // Obtain current position. Clear the offset since we are only reading the // DB and not applying the current WAL. pos, err := db.Pos() if err != nil { - return 0, pos, err + return 0, 0, pos, err } pos.Offset = 0 - // Open file handle for database. - f, err := os.Open(db.Path()) - if err != nil { - return 0, pos, err + var tmpfile string + if src, err := os.Open(db.Path()); err != nil { + return 0, 0, pos, err + } else if dst, err := ioutil.TempFile("", "*-litestream.db"); err != nil { + return 0, 0, pos, err + } else if _, err := io.Copy(dst, src); err != nil { + return 0, 0, pos, err + } else if err := src.Close(); err != nil { + return 0, 0, pos, err + } else if err := dst.Close(); err != nil { + return 0, 0, pos, err + } else { + tmpfile = dst.Name() } - defer f.Close() + log.Printf("db.crc64: copied database to temporary location: %s", tmpfile) - // Compute checksum. - h := crc64.New(crc64.MakeTable(crc64.ISO)) - if _, err := io.Copy(h, f); err != nil { - return 0, pos, err + chksum, err := checksumFile(tmpfile) + if err != nil { + return 0, 0, pos, err } - return h.Sum64(), pos, nil + chksumAlt, err := checksumFileAt(tmpfile, int64(db.pageSize)) + if err != nil { + return 0, 0, pos, err + } + return chksum, chksumAlt, pos, nil } // RestoreOptions represents options for DB.Restore(). diff --git a/replica.go b/replica.go index 98bd564..5431498 100644 --- a/replica.go +++ b/replica.go @@ -4,7 +4,6 @@ import ( "compress/gzip" "context" "fmt" - "hash/crc64" "io" "io/ioutil" "log" @@ -990,11 +989,11 @@ func ValidateReplica(ctx context.Context, r Replica) error { // Compute checksum of primary database under lock. This prevents a // sync from occurring and the database will not be written. - chksum0, pos, err := db.CRC64() + chksum0, chksum0alt, pos, err := db.CRC64() if err != nil { return fmt.Errorf("cannot compute checksum: %w", err) } - log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos) + log.Printf("%s(%s): primary checksum computed: %016x @ %s (alt=%016x)", db.Path(), r.Name(), chksum0, pos, chksum0alt) // Wait until replica catches up to position. log.Printf("%s(%s): waiting for replica", db.Path(), r.Name()) @@ -1021,20 +1020,16 @@ func ValidateReplica(ctx context.Context, r Replica) error { } // Open file handle for restored database. - f, err := os.Open(db.Path()) + chksum1, err := checksumFile(restorePath) if err != nil { return err } - defer f.Close() - - // Compute checksum. - h := crc64.New(crc64.MakeTable(crc64.ISO)) - if _, err := io.Copy(h, f); err != nil { + chksum1alt, err := checksumFileAt(restorePath, int64(db.pageSize)) + if err != nil { return err } - chksum1 := h.Sum64() - log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1) + log.Printf("%s(%s): restore complete, replica checksum=%016x (alt=%016x)", db.Path(), r.Name(), chksum1, chksum1alt) // Validate checksums match. if chksum0 != chksum1 {