diff --git a/doc/DESIGN.md b/doc/DESIGN.md index 6661f10..67a40e2 100644 --- a/doc/DESIGN.md +++ b/doc/DESIGN.md @@ -68,9 +68,9 @@ bkt/ snapshots/ # snapshots w/ timestamp+offset 20000101T000000Z-000000000000023.snapshot wal/ # compressed WAL files - 000000000000001-0.wal.gz - 000000000000001-.wal.gz - 000000000000002-0.wal.gz + 000000000000001-0.wal.lz4 + 000000000000001-.wal.lz4 + 000000000000002-0.wal.lz4 00000002/ snapshot/ 000000000000000.snapshot @@ -82,7 +82,7 @@ bkt/ 20000101T000000Z-000000000000023.snapshot wal/ - 000000000000001.wal.gz + 000000000000001.wal.lz4 ``` diff --git a/go.mod b/go.mod index 4e8ab2c..127151a 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/aws/aws-sdk-go v1.27.0 github.com/davecgh/go-spew v1.1.1 github.com/mattn/go-sqlite3 v1.14.5 + github.com/pierrec/lz4/v4 v4.1.3 github.com/prometheus/client_golang v1.9.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 40988cd..3793bf7 100644 --- a/go.sum +++ b/go.sum @@ -197,7 +197,11 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4/v4 v4.1.3 h1:/dvQpkb0o1pVlSgKNQqfkavlnXaIK+hJ0LXsKRUN9D4= +github.com/pierrec/lz4/v4 v4.1.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/litestream.go b/litestream.go index 0321124..98c89ed 100644 --- a/litestream.go +++ b/litestream.go @@ -219,7 +219,7 @@ func ParseSnapshotPath(s string) (index int, ext string, err error) { return int(i64), a[2], nil } -var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.gz)?)$`) +var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.lz4)?)$`) // IsWALPath returns true if s is a path to a WAL file. func IsWALPath(s string) bool { @@ -254,7 +254,7 @@ func FormatWALPathWithOffset(index int, offset int64) string { return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt) } -var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.gz)?)$`) +var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.lz4)?)$`) // isHexChar returns true if ch is a lowercase hex character. func isHexChar(ch rune) bool { diff --git a/replica.go b/replica.go index 6253e4f..495ea1d 100644 --- a/replica.go +++ b/replica.go @@ -1,7 +1,6 @@ package litestream import ( - "compress/gzip" "context" "fmt" "io" @@ -15,6 +14,7 @@ import ( "time" "github.com/benbjohnson/litestream/internal" + "github.com/pierrec/lz4/v4" "github.com/prometheus/client_golang/prometheus" ) @@ -176,7 +176,7 @@ func (r *FileReplica) SnapshotDir(generation string) string { // SnapshotPath returns the path to a snapshot file. func (r *FileReplica) SnapshotPath(generation string, index int) string { - return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index)) + return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.lz4", index)) } // MaxSnapshotIndex returns the highest index for the snapshots. @@ -622,7 +622,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) { } } - // Gzip any old WAL files. + // Compress any old WAL files. if generation != "" { if err := r.compress(ctx, generation); err != nil { return fmt.Errorf("cannot compress: %s", err) @@ -683,7 +683,7 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) { return nil } -// compress gzips all WAL files before the current one. +// compress compresses all WAL files before the current one. func (r *FileReplica) compress(ctx context.Context, generation string) error { filenames, err := filepath.Glob(filepath.Join(r.WALDir(generation), "**/*.wal")) if err != nil { @@ -704,7 +704,7 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error { default: } - dst := filename + ".gz" + dst := filename + ".lz4" if err := compressFile(filename, dst, r.db.uid, r.db.gid); err != nil { return err } else if err := os.Remove(filename); err != nil { @@ -738,16 +738,11 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind } else if ext == ".snapshot" { return f, nil // not compressed, return as-is. } - assert(ext == ".snapshot.gz", "invalid snapshot extension") + assert(ext == ".snapshot.lz4", "invalid snapshot extension") - // If compressed, wrap in a gzip reader and return with wrapper to + // If compressed, wrap in an lz4 reader and return with wrapper to // ensure that the underlying file is closed. - r, err := gzip.NewReader(f) - if err != nil { - f.Close() - return nil, err - } - return internal.NewReadCloser(r, f), nil + return internal.NewReadCloser(lz4.NewReader(f), f), nil } return nil, os.ErrNotExist } @@ -766,19 +761,14 @@ func (r *FileReplica) WALReader(ctx context.Context, generation string, index in } // Otherwise read the compressed file. Return error if file doesn't exist. - f, err = os.Open(filename + ".gz") + f, err = os.Open(filename + ".lz4") if err != nil { return nil, err } - // If compressed, wrap in a gzip reader and return with wrapper to + // If compressed, wrap in an lz4 reader and return with wrapper to // ensure that the underlying file is closed. - rd, err := gzip.NewReader(f) - if err != nil { - f.Close() - return nil, err - } - return internal.NewReadCloser(rd, f), nil + return internal.NewReadCloser(lz4.NewReader(f), f), nil } // EnforceRetention forces a new snapshot once the retention interval has passed. @@ -950,7 +940,7 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int, return index, nil } -// compressFile compresses a file and replaces it with a new file with a .gz extension. +// compressFile compresses a file and replaces it with a new file with a .lz4 extension. func compressFile(src, dst string, uid, gid int) error { r, err := os.Open(src) if err != nil { @@ -964,13 +954,13 @@ func compressFile(src, dst string, uid, gid int) error { } defer w.Close() - gz := gzip.NewWriter(w) - defer gz.Close() + zr := lz4.NewWriter(w) + defer zr.Close() // Copy & compress file contents to temporary file. - if _, err := io.Copy(gz, r); err != nil { + if _, err := io.Copy(zr, r); err != nil { return err - } else if err := gz.Close(); err != nil { + } else if err := zr.Close(); err != nil { return err } else if err := w.Sync(); err != nil { return err diff --git a/s3/s3.go b/s3/s3.go index 60b8634..becfb67 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -2,7 +2,6 @@ package s3 import ( "bytes" - "compress/gzip" "context" "fmt" "io" @@ -20,6 +19,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/internal" + "github.com/pierrec/lz4/v4" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -155,7 +155,7 @@ func (r *Replica) SnapshotDir(generation string) string { // SnapshotPath returns the path to a snapshot file. func (r *Replica) SnapshotPath(generation string, index int) string { - return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index)) + return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.lz4", index)) } // MaxSnapshotIndex returns the highest index for the snapshots. @@ -588,13 +588,13 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er } pr, pw := io.Pipe() - gw := gzip.NewWriter(pw) + zw := lz4.NewWriter(pw) go func() { - if _, err := io.Copy(gw, f); err != nil { + if _, err := io.Copy(zw, f); err != nil { _ = pw.CloseWithError(err) return } - _ = pw.CloseWithError(gw.Close()) + _ = pw.CloseWithError(zw.Close()) }() snapshotPath := r.SnapshotPath(generation, index) @@ -770,11 +770,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { } var buf bytes.Buffer - gw := gzip.NewWriter(&buf) - n, err := gw.Write(b) + zw := lz4.NewWriter(&buf) + n, err := zw.Write(b) if err != nil { return err - } else if err := gw.Close(); err != nil { + } else if err := zw.Close(); err != nil { return err } @@ -782,7 +782,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { // that files are contiguous without having to decompress. walPath := path.Join( r.WALDir(rd.Pos().Generation), - litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".gz", + litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".lz4", ) if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ @@ -826,12 +826,7 @@ func (r *Replica) SnapshotReader(ctx context.Context, generation string, index i r.getOperationBytesCounter.Add(float64(*out.ContentLength)) // Decompress the snapshot file. - gr, err := gzip.NewReader(out.Body) - if err != nil { - out.Body.Close() - return nil, err - } - return internal.NewReadCloser(gr, out.Body), nil + return internal.NewReadCloser(lz4.NewReader(out.Body), out.Body), nil } // WALReader returns a reader for WAL data at the given index. @@ -886,13 +881,9 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( r.getOperationTotalCounter.Inc() r.getOperationTotalCounter.Add(float64(*out.ContentLength)) - gr, err := gzip.NewReader(out.Body) - if err != nil { - return nil, err - } - defer gr.Close() + zr := lz4.NewReader(out.Body) - n, err := io.Copy(&buf, gr) + n, err := io.Copy(&buf, zr) if err != nil { return nil, err }