Copy shadow WAL immediately after init

This commit is contained in:
Ben Johnson
2021-01-18 10:01:16 -07:00
parent 2ce4052300
commit 358dcd4650
2 changed files with 89 additions and 46 deletions

118
db.go
View File

@@ -589,7 +589,7 @@ func (db *DB) createGeneration() (string, error) {
} }
// Initialize shadow WAL with copy of header. // Initialize shadow WAL with copy of header.
if err := db.initShadowWALFile(db.ShadowWALPath(generation, 0)); err != nil { if _, err := db.initShadowWALFile(db.ShadowWALPath(generation, 0)); err != nil {
return "", fmt.Errorf("initialize shadow wal: %w", err) return "", fmt.Errorf("initialize shadow wal: %w", err)
} }
@@ -713,7 +713,7 @@ func (db *DB) Sync() (err error) {
if checkpoint { if checkpoint {
changed = true changed = true
if err := db.checkpointAndInit(info.shadowWALPath, checkpointMode); err != nil { if err := db.checkpointAndInit(info.generation, checkpointMode); err != nil {
return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err) return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err)
} }
} }
@@ -875,42 +875,45 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
// Start a new shadow WAL file with next index. // Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1)) newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
if err := db.initShadowWALFile(newShadowWALPath); err != nil { newSize, err = db.initShadowWALFile(newShadowWALPath)
return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
}
// Copy rest of valid WAL to new shadow WAL.
newSize, err = db.copyToShadowWAL(newShadowWALPath)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err) return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
} }
return newSize, nil return newSize, nil
} }
func (db *DB) initShadowWALFile(filename string) error { func (db *DB) initShadowWALFile(filename string) (int64, error) {
hdr, err := readWALHeader(db.WALPath()) hdr, err := readWALHeader(db.WALPath())
if err != nil { if err != nil {
return fmt.Errorf("read header: %w", err) return 0, fmt.Errorf("read header: %w", err)
} }
// Determine byte order for checksumming from header magic. // Determine byte order for checksumming from header magic.
bo, err := headerByteOrder(hdr) bo, err := headerByteOrder(hdr)
if err != nil { if err != nil {
return err return 0, err
} }
// Verify checksum. // Verify checksum.
s0 := binary.BigEndian.Uint32(hdr[24:]) s0 := binary.BigEndian.Uint32(hdr[24:])
s1 := binary.BigEndian.Uint32(hdr[28:]) s1 := binary.BigEndian.Uint32(hdr[28:])
if v0, v1 := Checksum(bo, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 { if v0, v1 := Checksum(bo, 0, 0, hdr[:24]); v0 != s0 || v1 != s1 {
return fmt.Errorf("invalid header checksum: (%x,%x) != (%x,%x)", v0, v1, s0, s1) return 0, fmt.Errorf("invalid header checksum: (%x,%x) != (%x,%x)", v0, v1, s0, s1)
} }
// Write header to new WAL shadow file. // Write header to new WAL shadow file.
if err := mkdirAll(filepath.Dir(filename), 0700, db.uid, db.gid); err != nil { if err := mkdirAll(filepath.Dir(filename), 0700, db.uid, db.gid); err != nil {
return err return 0, err
} else if err := ioutil.WriteFile(filename, hdr, 0600); err != nil {
return 0, err
} }
return ioutil.WriteFile(filename, hdr, 0600)
// Copy as much shadow WAL as available.
newSize, err := db.copyToShadowWAL(filename)
if err != nil {
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err)
}
return newSize, nil
} }
func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
@@ -1199,7 +1202,12 @@ func (db *DB) checkpoint(mode string) (err error) {
// checkpointAndInit performs a checkpoint on the WAL file and initializes a // checkpointAndInit performs a checkpoint on the WAL file and initializes a
// new shadow WAL file. // new shadow WAL file.
func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error { func (db *DB) checkpointAndInit(generation, mode string) error {
shadowWALPath, err := db.CurrentShadowWALPath(generation)
if err != nil {
return err
}
// Read WAL header before checkpoint to check if it has been restarted. // Read WAL header before checkpoint to check if it has been restarted.
hdr, err := readWALHeader(db.WALPath()) hdr, err := readWALHeader(db.WALPath())
if err != nil { if err != nil {
@@ -1234,7 +1242,7 @@ func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error {
// Start a new shadow WAL file with next index. // Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1)) newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1))
if err := db.initShadowWALFile(newShadowWALPath); err != nil { if _, err := db.initShadowWALFile(newShadowWALPath); err != nil {
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
} }
@@ -1319,7 +1327,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
tmpPath := outputPath + ".tmp" tmpPath := outputPath + ".tmp"
// Copy snapshot to output path. // Copy snapshot to output path.
var dbChksum uint64 var dbChksum, dbChksumAlt uint64
if !opt.DryRun { if !opt.DryRun {
if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err) return fmt.Errorf("cannot restore snapshot: %w", err)
@@ -1327,8 +1335,11 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error {
if dbChksum, err = checksumFile(tmpPath); err != nil { if dbChksum, err = checksumFile(tmpPath); err != nil {
return fmt.Errorf("cannot compute db checksum: %w", err) return fmt.Errorf("cannot compute db checksum: %w", err)
} }
if dbChksumAlt, err = checksumFileAt(tmpPath, int64(db.pageSize)); err != nil {
return fmt.Errorf("cannot compute alternate db checksum: %w", err)
} }
log.Printf("%s(%s): restoring snapshot %s/%08x to %s, checksum=%016x", db.path, r.Name(), generation, minWALIndex, tmpPath, dbChksum) }
log.Printf("%s(%s): restoring snapshot %s/%08x to %s, checksum=%016x alt=%016x", db.path, r.Name(), generation, minWALIndex, tmpPath, dbChksum, dbChksumAlt)
// Restore each WAL file until we reach our maximum index. // Restore each WAL file until we reach our maximum index.
var walChksum uint64 var walChksum uint64
@@ -1373,6 +1384,24 @@ func checksumFile(filename string) (uint64, error) {
return h.Sum64(), nil return h.Sum64(), nil
} }
func checksumFileAt(filename string, offset int64) (uint64, error) {
f, err := os.Open(filename)
if err != nil {
return 0, err
}
defer f.Close()
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return 0, err
}
h := crc64.New(crc64.MakeTable(crc64.ISO))
if _, err := io.Copy(h, f); err != nil {
return 0, err
}
return h.Sum64(), nil
}
func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) { func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) {
var target struct { var target struct {
replica Replica replica Replica
@@ -1507,42 +1536,61 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
// This function obtains a read lock so it prevents syncs from occuring until // This function obtains a read lock so it prevents syncs from occuring until
// the operation is complete. The database will still be usable but it will be // the operation is complete. The database will still be usable but it will be
// unable to checkpoint during this time. // unable to checkpoint during this time.
func (db *DB) CRC64() (uint64, Pos, error) { func (db *DB) CRC64() (uint64, uint64, Pos, error) {
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
if err := db.init(); err != nil { if err := db.init(); err != nil {
return 0, Pos{}, err return 0, 0, Pos{}, err
} else if db.db == nil { } else if db.db == nil {
return 0, Pos{}, os.ErrNotExist return 0, 0, Pos{}, os.ErrNotExist
}
generation, err := db.CurrentGeneration()
if err != nil {
return 0, 0, Pos{}, fmt.Errorf("cannot find current generation: %w", err)
} else if generation == "" {
return 0, 0, Pos{}, fmt.Errorf("no current generation")
} }
// Force a RESTART checkpoint to ensure the database is at the start of the WAL. // Force a RESTART checkpoint to ensure the database is at the start of the WAL.
if err := db.checkpoint(CheckpointModeRestart); err != nil { if err := db.checkpointAndInit(generation, CheckpointModeRestart); err != nil {
return 0, Pos{}, err return 0, 0, Pos{}, err
} }
// Obtain current position. Clear the offset since we are only reading the // Obtain current position. Clear the offset since we are only reading the
// DB and not applying the current WAL. // DB and not applying the current WAL.
pos, err := db.Pos() pos, err := db.Pos()
if err != nil { if err != nil {
return 0, pos, err return 0, 0, pos, err
} }
pos.Offset = 0 pos.Offset = 0
// Open file handle for database. var tmpfile string
f, err := os.Open(db.Path()) if src, err := os.Open(db.Path()); err != nil {
if err != nil { return 0, 0, pos, err
return 0, pos, err } else if dst, err := ioutil.TempFile("", "*-litestream.db"); err != nil {
return 0, 0, pos, err
} else if _, err := io.Copy(dst, src); err != nil {
return 0, 0, pos, err
} else if err := src.Close(); err != nil {
return 0, 0, pos, err
} else if err := dst.Close(); err != nil {
return 0, 0, pos, err
} else {
tmpfile = dst.Name()
} }
defer f.Close() log.Printf("db.crc64: copied database to temporary location: %s", tmpfile)
// Compute checksum. chksum, err := checksumFile(tmpfile)
h := crc64.New(crc64.MakeTable(crc64.ISO)) if err != nil {
if _, err := io.Copy(h, f); err != nil { return 0, 0, pos, err
return 0, pos, err
} }
return h.Sum64(), pos, nil chksumAlt, err := checksumFileAt(tmpfile, int64(db.pageSize))
if err != nil {
return 0, 0, pos, err
}
return chksum, chksumAlt, pos, nil
} }
// RestoreOptions represents options for DB.Restore(). // RestoreOptions represents options for DB.Restore().

View File

@@ -4,7 +4,6 @@ import (
"compress/gzip" "compress/gzip"
"context" "context"
"fmt" "fmt"
"hash/crc64"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
@@ -990,11 +989,11 @@ func ValidateReplica(ctx context.Context, r Replica) error {
// Compute checksum of primary database under lock. This prevents a // Compute checksum of primary database under lock. This prevents a
// sync from occurring and the database will not be written. // sync from occurring and the database will not be written.
chksum0, pos, err := db.CRC64() chksum0, chksum0alt, pos, err := db.CRC64()
if err != nil { if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err) return fmt.Errorf("cannot compute checksum: %w", err)
} }
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos) log.Printf("%s(%s): primary checksum computed: %016x @ %s (alt=%016x)", db.Path(), r.Name(), chksum0, pos, chksum0alt)
// Wait until replica catches up to position. // Wait until replica catches up to position.
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name()) log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
@@ -1021,20 +1020,16 @@ func ValidateReplica(ctx context.Context, r Replica) error {
} }
// Open file handle for restored database. // Open file handle for restored database.
f, err := os.Open(db.Path()) chksum1, err := checksumFile(restorePath)
if err != nil { if err != nil {
return err return err
} }
defer f.Close() chksum1alt, err := checksumFileAt(restorePath, int64(db.pageSize))
if err != nil {
// Compute checksum.
h := crc64.New(crc64.MakeTable(crc64.ISO))
if _, err := io.Copy(h, f); err != nil {
return err return err
} }
chksum1 := h.Sum64()
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1) log.Printf("%s(%s): restore complete, replica checksum=%016x (alt=%016x)", db.Path(), r.Name(), chksum1, chksum1alt)
// Validate checksums match. // Validate checksums match.
if chksum0 != chksum1 { if chksum0 != chksum1 {