diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 0c58d3a..ba00d69 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -11,7 +11,7 @@ import ( "bazil.org/fuse" "bazil.org/fuse/fs" - "github.com/middlemost/litestream" + "github.com/benbjohnson/litestream" ) func main() { diff --git a/handle.go b/handle.go index 6718ba8..791c5a2 100644 --- a/handle.go +++ b/handle.go @@ -53,7 +53,7 @@ func (h *Handle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Rea // Write writes data at a given offset to the underlying file. func (h *Handle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) (err error) { - log.Printf("write: name=%s offset=%d", h.f.Name(), req.Offset) + log.Printf("write: name=%s offset=%d n=%d", h.f.Name(), req.Offset, len(req.Data)) println(hex.Dump(req.Data)) resp.Size, err = h.f.WriteAt(req.Data, req.Offset) diff --git a/litestream.go b/litestream.go index 6daef29..da44fc9 100644 --- a/litestream.go +++ b/litestream.go @@ -1,9 +1,16 @@ package litestream import ( + "encoding/binary" "io" ) +// Magic number specified at the beginning of WAL files. +const ( + MagicLittleEndian = 0x377f0682 + MagicBigEndian = 0x377f0683 +) + const ( WriteVersionOffset = 18 ReadVersionOffset = 19 @@ -17,3 +24,19 @@ func ReadVersion(b []byte) (writeVersion, readVersion uint8, err error) { } return b[WriteVersionOffset], b[ReadVersionOffset], nil } + +// Checksum computes a running checksum over a byte slice. +func Checksum(bo binary.ByteOrder, s uint64, b []byte) (_ uint64, err error) { + // Ensure byte slice length is divisible by 8. + if len(b)%8 != 0 { + return 0, ErrChecksumMisaligned + } + + // Iterate over 8-byte units and compute checksum. + s0, s1 := uint32(s>>32), uint32(s&0xFFFFFFFF) + for i := 0; i < len(b); i += 8 { + s0 += bo.Uint32(b[i:]) + s1 + s1 += bo.Uint32(b[i+4:]) + s0 + } + return uint64(s0)<<32 | uint64(s1), nil +} diff --git a/litestream_test.go b/litestream_test.go new file mode 100644 index 0000000..30d8cc6 --- /dev/null +++ b/litestream_test.go @@ -0,0 +1,49 @@ +package litestream_test + +import ( + "encoding/binary" + "encoding/hex" + "testing" + + "github.com/benbjohnson/litestream" +) + +func TestChecksum(t *testing.T) { + // Ensure a WAL header, frame header, & frame data can be checksummed in one pass. + t.Run("OnePass", func(t *testing.T) { + if input, err := hex.DecodeString("err != nil { + t.Fatal(err) + } else if chksum, err := litestream.Checksum(binary.LittleEndian, 0, input); err != nil { + t.Fatal(err) + } else if got, want := chksum, uint64(0xdc2f3e84540488d3); got != want { + t.Fatalf("Checksum()=%x, want %x", got, want) + } + }) + + // Ensure we get the same result as OnePass even if we split up into multiple calls. + t.Run("Incremental", func(t *testing.T) { + // Compute checksum for beginning of WAL header. + if chksum, err := litestream.Checksum(binary.LittleEndian, 0, MustDecodeHexString("377f0682002de218000010000000000052382eac857b1a4e")); err != nil { + t.Fatal(err) + } else if got, want := chksum, uint64(0x81153b6587178e8f); got != want { + t.Fatalf("Checksum()=%x, want %x", got, want) + } + + // Continue checksum with WAL frame header & frame contents. + if chksum1, err := litestream.Checksum(binary.LittleEndian, 0x81153b6587178e8f, MustDecodeHexString("0000000200000002")); err != nil { + t.Fatal(err) + } else if chksum2, err := litestream.Checksum(binary.LittleEndian, chksum1, MustDecodeHexString(``)); err != nil { + t.Fatal(err) + } else if got, want := chksum2, uint64(0xdc2f3e84540488d3); got != want { + t.Fatalf("Checksum()=%x, want %x", got, want) + } + }) +} + +func MustDecodeHexString(s string) []byte { + b, err := hex.DecodeString(s) + if err != nil { + panic(err) + } + return b +} diff --git a/wal.go b/wal.go index 5814a55..c6ecc35 100644 --- a/wal.go +++ b/wal.go @@ -2,10 +2,24 @@ package litestream import ( "encoding/binary" + "errors" + "fmt" "io" "os" ) +var ( + // ErrWALHeaderEmpty is returned when writing an empty header. + ErrWALHeaderEmpty = errors.New("wal header empty") + + // ErrWALFileInitialized is returned when writing a header to a + // WAL file that has already has its header written. + ErrWALFileInitialized = errors.New("wal file already initialized") + + // ErrChecksumMisaligned is returned when input byte length is not divisible by 8. + ErrChecksumMisaligned = errors.New("checksum input misaligned") +) + // WALFile represents a write-ahead log file. type WALFile struct { path string @@ -19,17 +33,54 @@ func NewWALFile(path string) *WALFile { return &WALFile{path: path} } -func (s *WALFile) Open() error { - panic("TODO") +// WALHeader returns the WAL header. The return is the zero value if unset. +func (s *WALFile) Header() WALHeader { + return s.hdr } +// Open initializes the WAL file descriptor. Creates the file if it doesn't exist. +func (s *WALFile) Open() (err error) { + // TODO: Validate file contents if non-zero. Return ErrWALFileInvalidHeader if header invalid. + // TODO: Truncate transaction if commit record is invalid. + + if s.f, err = os.OpenFile(s.path, os.O_RDWR|os.O_CREATE, 0666); err != nil { + return err + } + return nil +} + +// Close syncs the WAL file and closes the file descriptor. func (s *WALFile) Close() error { - panic("TODO") + if err := s.f.Sync(); err != nil { + return fmt.Errorf("wal sync: %w", err) + } + return s.f.Close() } +// Sync calls Sync() on the underlying file descriptor. +func (s *WALFile) Sync() error { + return s.f.Sync() +} + +// WriteHeader writes hdr to the WAL file. +// Returns an error if hdr is empty or if the file already has a header. func (s *WALFile) WriteHeader(hdr WALHeader) error { + if hdr.IsZero() { + return ErrWALHeaderEmpty + } else if !s.hdr.IsZero() { + return ErrWALFileInitialized + } s.hdr = hdr - panic("TODO") + + // Marshal header & write to file. + b := make([]byte, WALHeaderSize) + if err := s.hdr.MarshalTo(b); err != nil { + return fmt.Errorf("marshal wal header: %w", err) + } else if _, err := s.f.Write(b); err != nil { + return err + } + + return nil } func (s *WALFile) WriteFrame(hdr WALFrameHeader, buf []byte) error { @@ -48,6 +99,23 @@ type WALHeader struct { Checksum uint64 } +// IsZero returns true if hdr is the zero value. +func (hdr WALHeader) IsZero() bool { + return hdr == (WALHeader{}) +} + +// ByteOrder returns the byte order based on the hdr magic. +func (hdr WALHeader) ByteOrder() binary.ByteOrder { + switch hdr.Magic { + case MagicLittleEndian: + return binary.LittleEndian + case MagicBigEndian: + return binary.BigEndian + default: + return nil + } +} + // MarshalTo encodes the header to b. // Returns io.ErrShortWrite if len(b) is less than WALHeaderSize. func (hdr *WALHeader) MarshalTo(b []byte) error { @@ -89,6 +157,11 @@ type WALFrameHeader struct { Checksum uint64 } +// IsZero returns true if hdr is the zero value. +func (hdr WALFrameHeader) IsZero() bool { + return hdr == (WALFrameHeader{}) +} + // IsCommit returns true if the frame represents a commit header. func (hdr *WALFrameHeader) IsCommit() bool { return hdr.PageN != 0 diff --git a/wal_test.go b/wal_test.go index 60a52e7..0038cda 100644 --- a/wal_test.go +++ b/wal_test.go @@ -2,11 +2,30 @@ package litestream_test import ( "io" + "path/filepath" "testing" "github.com/benbjohnson/litestream" ) +func TestWALFile_WriteHeader(t *testing.T) { + t.Run("OK", func(t *testing.T) { + f := MustOpenWALFile(t, "0000") + defer MustCloseWALFile(t, f) + + if err := f.WriteHeader(litestream.WALHeader{ + Magic: litestream.MagicLittleEndian, + FileFormatVersion: 1001, + PageSize: 4096, + CheckpointSeqNo: 1003, + }); err != nil { + t.Fatal(err) + } + + t.Fatal("TODO: Ensure header written correctly") + }) +} + func TestWALHeader_MarshalTo(t *testing.T) { // Ensure the WAL header can be marshaled and unmarshaled correctly. t.Run("OK", func(t *testing.T) { @@ -90,3 +109,22 @@ func TestWALFrameHeader_Unmarshal(t *testing.T) { } }) } + +// MustOpenWALFile returns a new, open instance of WALFile written to a temp dir. +func MustOpenWALFile(tb testing.TB, name string) *litestream.WALFile { + tb.Helper() + + f := litestream.NewWALFile(filepath.Join(tb.TempDir(), name)) + if err := f.Open(); err != nil { + tb.Fatal(err) + } + return f +} + +// MustCloseWALFile closes an instance of WALFile. +func MustCloseWALFile(tb testing.TB, f *litestream.WALFile) { + tb.Helper() + if err := f.Close(); err != nil { + tb.Fatal(err) + } +}