From 9fa526f2c35e1f78be775898ae1aa401d28327da Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 24 Dec 2020 15:41:45 -0700 Subject: [PATCH] File replicator snapshot --- db.go | 34 ++++--- litestream.go | 5 +- replicator.go | 77 ++++++++++++--- sqlite/sqlite.go | 213 ------------------------------------------ sqlite/sqlite_test.go | 112 ---------------------- 5 files changed, 87 insertions(+), 354 deletions(-) delete mode 100644 sqlite/sqlite.go delete mode 100644 sqlite/sqlite_test.go diff --git a/db.go b/db.go index adfd03e..f829e8a 100644 --- a/db.go +++ b/db.go @@ -102,34 +102,40 @@ func (db *DB) GenerationPath(generation string) string { // ShadowWALPath returns the path of a single shadow WAL file. func (db *DB) ShadowWALPath(generation string, index int) string { assert(index >= 0, "shadow wal index cannot be negative") - return filepath.Join(db.GenerationPath(generation), "wal", fmt.Sprintf("%016x", index)+".wal") + return filepath.Join(db.GenerationPath(generation), "wal", fmt.Sprintf("%016x", index)+WALExt) } // CurrentShadowWALPath returns the path to the last shadow WAL in a generation. func (db *DB) CurrentShadowWALPath(generation string) (string, error) { - // TODO: Cache current shadow WAL path. - dir := filepath.Join(db.GenerationPath(generation), "wal") - fis, err := ioutil.ReadDir(dir) + index, err := db.CurrentShadowWALIndex(generation) if err != nil { return "", err } + return db.ShadowWALPath(generation, index), nil +} - // Find highest wal file. - var max string +// CurrentShadowWALIndex returns the current WAL index for a given generation. +func (db *DB) CurrentShadowWALIndex(generation string) (int, error) { + fis, err := ioutil.ReadDir(filepath.Join(db.GenerationPath(generation), "wal")) + if os.IsNotExist(err) { + return 0, nil // no wal files written for generation + } else if err != nil { + return 0, err + } + + // Find highest wal index. + var index int for _, fi := range fis { if !strings.HasSuffix(fi.Name(), WALExt) { continue } - if max == "" || fi.Name() > max { - max = fi.Name() + if v, err := ParseWALFilename(filepath.Base(fi.Name())); err != nil { + continue // invalid wal filename + } else if v > index { + index = v } } - - // Return error if we found no WAL files. - if max == "" { - return "", fmt.Errorf("no wal files found in %q", dir) - } - return filepath.Join(dir, max), nil + return index, nil } // Notify returns a channel that closes when the shadow WAL changes. diff --git a/litestream.go b/litestream.go index 0350c75..3e53d29 100644 --- a/litestream.go +++ b/litestream.go @@ -16,8 +16,9 @@ import ( const ( MetaDirSuffix = "-litestream" - WALDirName = "wal" - WALExt = ".wal" + WALDirName = "wal" + WALExt = ".wal" + SnapshotExt = ".snapshot" GenerationNameLen = 16 ) diff --git a/replicator.go b/replicator.go index e3f4baa..3713978 100644 --- a/replicator.go +++ b/replicator.go @@ -67,8 +67,8 @@ func (r *FileReplicator) SnapshotPath(generation string, index int) string { } // WALPath returns the path to a WAL file. -func (r *FileReplicator) WALPath(pos Pos) string { - return filepath.Join(r.dst, "generations", pos.Generation, "wal", fmt.Sprintf("%016x.wal", pos.Index)) +func (r *FileReplicator) WALPath(generation string, index int) string { + return filepath.Join(r.dst, "generations", generation, "wal", fmt.Sprintf("%016x.wal", index)) } // Snapshotting returns true if replicator is current snapshotting. @@ -133,9 +133,17 @@ func (r *FileReplicator) monitor(ctx context.Context) { } } + // If we have no replicated WALs, start from last index in shadow WAL. + if pos.Index == 0 && pos.Offset == 0 { + if pos.Index, err = r.db.CurrentShadowWALIndex(pos.Generation); err != nil { + log.Printf("%s(%s): cannot determine latest shadow wal index: %s", r.db.Path(), r.Name(), err) + continue + } + } + // Synchronize the shadow wal into the replication directory. if pos, err = r.sync(ctx, pos); err != nil { - log.Printf("%s(%s): sync error: %w", r.db.Path(), r.Name(), err) + log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) continue } } @@ -164,7 +172,7 @@ func (r *FileReplicator) pos() (pos Pos, err error) { index := -1 for _, fi := range fis { - if !strings.HasSuffix(fi.Name(), ".wal") { + if !strings.HasSuffix(fi.Name(), WALExt) { continue } @@ -190,19 +198,36 @@ func (r *FileReplicator) pos() (pos Pos, err error) { } // snapshot copies the entire database to the replica path. -func (r *FileReplicator) snapshot(ctx context.Context, pos Pos) error { +func (r *FileReplicator) snapshot(ctx context.Context, generation string, index int) error { + // Mark replicator as snapshotting to prevent checkpoints by the DB. + r.mu.Lock() + r.snapshotting = true + r.mu.Unlock() + + // Ensure we release the snapshot flag when we leave the function. + defer func() { + r.mu.Lock() + r.snapshotting = false + r.mu.Unlock() + }() + + // Ignore if we already have a snapshot for the given WAL index. + snapshotPath := r.SnapshotPath(generation, index) + if _, err := os.Stat(snapshotPath); err == nil { + return nil + } + rd, err := os.Open(r.db.Path()) if err != nil { return err } defer rd.Close() - snapshotPath := r.SnapshotPath(pos.Generation, pos.Index) - if err := os.MkdirAll(snapshotPath, 0700); err != nil { + if err := os.MkdirAll(filepath.Dir(snapshotPath), 0700); err != nil { return err } - w, err := os.Create(snapshotPath) + w, err := os.Create(snapshotPath + ".tmp") if err != nil { return err } @@ -214,16 +239,33 @@ func (r *FileReplicator) snapshot(ctx context.Context, pos Pos) error { return err } else if err := w.Close(); err != nil { return err + } else if err := os.Rename(snapshotPath+".tmp", snapshotPath); err != nil { + return err } - r.mu.Lock() - r.snapshotting = false - r.mu.Unlock() - return nil } +// snapshotN returns the number of snapshots for a generation. +func (r *FileReplicator) snapshotN(generation string) (int, error) { + fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations", generation, "snapshots")) + if os.IsNotExist(err) { + return 0, nil + } else if err != nil { + return 0, err + } + + var n int + for _, fi := range fis { + if strings.HasSuffix(fi.Name(), SnapshotExt) { + n++ + } + } + return n, nil +} + func (r *FileReplicator) sync(ctx context.Context, pos Pos) (_ Pos, err error) { + // Read all WAL files since the last position. for { if pos, err = r.syncNext(ctx, pos); err == io.EOF { return pos, nil @@ -242,8 +284,17 @@ func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err erro } defer rd.Close() + // Create snapshot if no snapshots exist. + if n, err := r.snapshotN(rd.Pos().Generation); err != nil { + return pos, err + } else if n == 0 { + if err := r.snapshot(ctx, rd.Pos().Generation, rd.Pos().Index); err != nil { + return pos, err + } + } + // Ensure parent directory exists for WAL file. - filename := r.WALPath(rd.Pos()) + filename := r.WALPath(rd.Pos().Generation, rd.Pos().Index) if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil { return pos, err } diff --git a/sqlite/sqlite.go b/sqlite/sqlite.go deleted file mode 100644 index 9d265d6..0000000 --- a/sqlite/sqlite.go +++ /dev/null @@ -1,213 +0,0 @@ -package sqlite - -import ( - "bytes" - "encoding/binary" - "errors" - "io" - "strings" -) - -// TODO: Pages can be written multiple times before 3.11.0 (https://sqlite.org/releaselog/3_11_0.html) - -var ( - // ErrWALHeaderEmpty is returned when writing an empty header. - ErrWALHeaderEmpty = errors.New("wal header empty") - - // ErrWALFileInitialized is returned when writing a header to a - // WAL file that has already has its header written. - ErrWALFileInitialized = errors.New("wal file already initialized") - - // ErrChecksumMisaligned is returned when input byte length is not divisible by 8. - ErrChecksumMisaligned = errors.New("checksum input misaligned") -) - -// HeaderSize is the size of a SQLite 3 database header, in bytes. -const HeaderSize = 100 - -// WALSuffix is the suffix appended to the end of SQLite WAL path names. -const WALSuffix = "-wal" - -// Magic number specified at the beginning of WAL files. -const ( - MagicLittleEndian = 0x377f0682 - MagicBigEndian = 0x377f0683 -) - -// IsWALPath returns true if path ends with WALSuffix. -func IsWALPath(path string) bool { - return strings.HasSuffix(path, WALSuffix) -} - -// IsValidHeader returns true if page contains the standard SQLITE3 header. -func IsValidHeader(page []byte) bool { - return bytes.HasPrefix(page, []byte("SQLite format 3\x00")) -} - -// IsWALEnabled returns true if header page has the file format read & write -// version set to 2 (which indicates WAL). -func IsWALEnabled(page []byte) bool { - return len(page) >= 19 && page[18] == 2 && page[19] == 2 -} - -// Checksum computes a running checksum over a byte slice. -func Checksum(bo binary.ByteOrder, s uint64, b []byte) (_ uint64, err error) { - // Ensure byte slice length is divisible by 8. - if len(b)%8 != 0 { - return 0, ErrChecksumMisaligned - } - - // Iterate over 8-byte units and compute checksum. - s0, s1 := uint32(s>>32), uint32(s&0xFFFFFFFF) - for i := 0; i < len(b); i += 8 { - s0 += bo.Uint32(b[i:]) + s1 - s1 += bo.Uint32(b[i+4:]) + s0 - } - return uint64(s0)<<32 | uint64(s1), nil -} - -// WALHeaderSize is the size of the WAL header, in bytes. -const WALHeaderSize = 32 - -type WALHeader struct { - Magic uint32 - FileFormatVersion uint32 - PageSize uint32 - CheckpointSeqNo uint32 - Salt uint64 - Checksum uint64 -} - -// IsZero returns true if hdr is the zero value. -func (hdr WALHeader) IsZero() bool { - return hdr == (WALHeader{}) -} - -// ByteOrder returns the byte order based on the hdr magic. -func (hdr WALHeader) ByteOrder() binary.ByteOrder { - switch hdr.Magic { - case MagicLittleEndian: - return binary.LittleEndian - case MagicBigEndian: - return binary.BigEndian - default: - return nil - } -} - -// ReadFrom reads hdr from r. -func (hdr *WALHeader) ReadFrom(r io.Reader) (n int64, err error) { - b := make([]byte, WALHeaderSize) - nn, err := io.ReadFull(r, b) - if n = int64(nn); err != nil { - return n, err - } - return n, hdr.Unmarshal(b) -} - -// WriteTo writes hdr to r. -func (hdr *WALHeader) WriteTo(w io.Writer) (n int64, err error) { - b := make([]byte, WALHeaderSize) - if err := hdr.MarshalTo(b); err != nil { - return 0, err - } - nn, err := w.Write(b) - return int64(nn), err -} - -// MarshalTo encodes the header to b. -// Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. -func (hdr *WALHeader) MarshalTo(b []byte) error { - if len(b) < WALHeaderSize { - return io.ErrShortWrite - } - binary.BigEndian.PutUint32(b[0:], hdr.Magic) - binary.BigEndian.PutUint32(b[4:], hdr.FileFormatVersion) - binary.BigEndian.PutUint32(b[8:], hdr.PageSize) - binary.BigEndian.PutUint32(b[12:], hdr.CheckpointSeqNo) - binary.BigEndian.PutUint64(b[16:], hdr.Salt) - binary.BigEndian.PutUint64(b[24:], hdr.Checksum) - return nil -} - -// Unmarshal decodes the header from b. -// Returns io.ErrUnexpectedEOF if len(b) is less than WALHeaderSize. -func (hdr *WALHeader) Unmarshal(b []byte) error { - if len(b) < WALHeaderSize { - return io.ErrUnexpectedEOF - } - hdr.Magic = binary.BigEndian.Uint32(b[0:]) - hdr.FileFormatVersion = binary.BigEndian.Uint32(b[4:]) - hdr.PageSize = binary.BigEndian.Uint32(b[8:]) - hdr.CheckpointSeqNo = binary.BigEndian.Uint32(b[12:]) - hdr.Salt = binary.BigEndian.Uint64(b[16:]) - hdr.Checksum = binary.BigEndian.Uint64(b[24:]) - return nil -} - -// WALFrameHeaderSize is the size of the WAL frame header, in bytes. -const WALFrameHeaderSize = 24 - -// WALFrameHeader represents a SQLite WAL frame header. -type WALFrameHeader struct { - Pgno uint32 - PageN uint32 // only set for commit records - Salt uint64 - Checksum uint64 -} - -// IsZero returns true if hdr is the zero value. -func (hdr WALFrameHeader) IsZero() bool { - return hdr == (WALFrameHeader{}) -} - -// IsCommit returns true if the frame represents a commit header. -func (hdr *WALFrameHeader) IsCommit() bool { - return hdr.PageN != 0 -} - -// ReadFrom reads hdr from r. -func (hdr *WALFrameHeader) ReadFrom(r io.Reader) (n int64, err error) { - b := make([]byte, WALFrameHeaderSize) - nn, err := io.ReadFull(r, b) - if n = int64(nn); err != nil { - return n, err - } - return n, hdr.Unmarshal(b) -} - -// WriteTo writes hdr to r. -func (hdr *WALFrameHeader) WriteTo(w io.Writer) (n int64, err error) { - b := make([]byte, WALFrameHeaderSize) - if err := hdr.MarshalTo(b); err != nil { - return 0, err - } - nn, err := w.Write(b) - return int64(nn), err -} - -// MarshalTo encodes the frame header to b. -// Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. -func (hdr *WALFrameHeader) MarshalTo(b []byte) error { - if len(b) < WALFrameHeaderSize { - return io.ErrShortWrite - } - binary.BigEndian.PutUint32(b[0:], hdr.Pgno) - binary.BigEndian.PutUint32(b[4:], hdr.PageN) - binary.BigEndian.PutUint64(b[8:], hdr.Salt) - binary.BigEndian.PutUint64(b[16:], hdr.Checksum) - return nil -} - -// Unmarshal decodes the frame header from b. -// Returns io.ErrUnexpectedEOF if len(b) is less than WALHeaderSize. -func (hdr *WALFrameHeader) Unmarshal(b []byte) error { - if len(b) < WALFrameHeaderSize { - return io.ErrUnexpectedEOF - } - hdr.Pgno = binary.BigEndian.Uint32(b[0:]) - hdr.PageN = binary.BigEndian.Uint32(b[4:]) - hdr.Salt = binary.BigEndian.Uint64(b[8:]) - hdr.Checksum = binary.BigEndian.Uint64(b[16:]) - return nil -} diff --git a/sqlite/sqlite_test.go b/sqlite/sqlite_test.go deleted file mode 100644 index e875114..0000000 --- a/sqlite/sqlite_test.go +++ /dev/null @@ -1,112 +0,0 @@ -package sqlite_test - -import ( - "io" - "path/filepath" - "testing" - - "github.com/benbjohnson/litestream/sqlite" -) - -func TestWALHeader_MarshalTo(t *testing.T) { - // Ensure the WAL header can be marshaled and unmarshaled correctly. - t.Run("OK", func(t *testing.T) { - hdr := litestream.WALHeader{ - Magic: 1000, - FileFormatVersion: 1001, - PageSize: 1002, - CheckpointSeqNo: 1003, - Salt: 1004, - Checksum: 1005, - } - b := make([]byte, litestream.WALHeaderSize) - if err := hdr.MarshalTo(b); err != nil { - t.Fatal(err) - } - - var other litestream.WALHeader - if err := other.Unmarshal(b); err != nil { - t.Fatal(err) - } else if got, want := hdr, other; got != want { - t.Fatalf("mismatch: got %#v, want %#v", got, want) - } - }) - - // Ensure that marshaling to a small byte slice returns an error. - t.Run("ErrShortWrite", func(t *testing.T) { - var hdr litestream.WALHeader - if err := hdr.MarshalTo(make([]byte, litestream.WALHeaderSize-1)); err != io.ErrShortWrite { - t.Fatalf("unexpected error: %#v", err) - } - }) -} - -func TestWALHeader_Unmarshal(t *testing.T) { - // Ensure that unmarshaling from a small byte slice returns an error. - t.Run("ErrUnexpectedEOF", func(t *testing.T) { - var hdr litestream.WALHeader - if err := hdr.Unmarshal(make([]byte, litestream.WALHeaderSize-1)); err != io.ErrUnexpectedEOF { - t.Fatalf("unexpected error: %#v", err) - } - }) -} - -func TestWALFrameHeader_MarshalTo(t *testing.T) { - // Ensure the WAL header can be marshaled and unmarshaled correctly. - t.Run("OK", func(t *testing.T) { - hdr := litestream.WALFrameHeader{ - Pgno: 1000, - PageN: 1001, - Salt: 1002, - Checksum: 1003, - } - b := make([]byte, litestream.WALFrameHeaderSize) - if err := hdr.MarshalTo(b); err != nil { - t.Fatal(err) - } - - var other litestream.WALFrameHeader - if err := other.Unmarshal(b); err != nil { - t.Fatal(err) - } else if got, want := hdr, other; got != want { - t.Fatalf("mismatch: got %#v, want %#v", got, want) - } - }) - - // Ensure that marshaling to a small byte slice returns an error. - t.Run("ErrShortWrite", func(t *testing.T) { - var hdr litestream.WALFrameHeader - if err := hdr.MarshalTo(make([]byte, litestream.WALFrameHeaderSize-1)); err != io.ErrShortWrite { - t.Fatalf("unexpected error: %#v", err) - } - }) -} - -func TestWALFrameHeader_Unmarshal(t *testing.T) { - // Ensure that unmarshaling from a small byte slice returns an error. - t.Run("ErrUnexpectedEOF", func(t *testing.T) { - var hdr litestream.WALFrameHeader - if err := hdr.Unmarshal(make([]byte, litestream.WALFrameHeaderSize-1)); err != io.ErrUnexpectedEOF { - t.Fatalf("unexpected error: %#v", err) - } - }) -} - -// MustOpenWALFile returns a new, open instance of WALFile written to a temp dir. -func MustOpenWALFile(tb testing.TB, name string) *litestream.WALFile { - tb.Helper() - - f := litestream.NewWALFile(filepath.Join(tb.TempDir(), name)) - if err := f.Open(); err != nil { - tb.Fatal(err) - } - return f -} - -// MustCloseWALFile closes an instance of WALFile. -func MustCloseWALFile(tb testing.TB, f *litestream.WALFile) { - tb.Helper() - if err := f.Close(); err != nil { - tb.Fatal(err) - } -}