Switch from gzip to lz4

This commit is contained in:
Ben Johnson
2021-01-18 14:45:12 -07:00
parent 35d755e7f2
commit 14dad1fd5a
6 changed files with 39 additions and 53 deletions

View File

@@ -68,9 +68,9 @@ bkt/
snapshots/ # snapshots w/ timestamp+offset snapshots/ # snapshots w/ timestamp+offset
20000101T000000Z-000000000000023.snapshot 20000101T000000Z-000000000000023.snapshot
wal/ # compressed WAL files wal/ # compressed WAL files
000000000000001-0.wal.gz 000000000000001-0.wal.lz4
000000000000001-<offset>.wal.gz 000000000000001-<offset>.wal.lz4
000000000000002-0.wal.gz 000000000000002-0.wal.lz4
00000002/ 00000002/
snapshot/ snapshot/
000000000000000.snapshot 000000000000000.snapshot
@@ -82,7 +82,7 @@ bkt/
20000101T000000Z-000000000000023.snapshot 20000101T000000Z-000000000000023.snapshot
wal/ wal/
000000000000001.wal.gz 000000000000001.wal.lz4
``` ```

1
go.mod
View File

@@ -6,6 +6,7 @@ require (
github.com/aws/aws-sdk-go v1.27.0 github.com/aws/aws-sdk-go v1.27.0
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/mattn/go-sqlite3 v1.14.5 github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3
github.com/prometheus/client_golang v1.9.0 github.com/prometheus/client_golang v1.9.0
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
) )

4
go.sum
View File

@@ -197,7 +197,11 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4/v4 v4.1.3 h1:/dvQpkb0o1pVlSgKNQqfkavlnXaIK+hJ0LXsKRUN9D4=
github.com/pierrec/lz4/v4 v4.1.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

View File

@@ -219,7 +219,7 @@ func ParseSnapshotPath(s string) (index int, ext string, err error) {
return int(i64), a[2], nil return int(i64), a[2], nil
} }
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.gz)?)$`) var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.lz4)?)$`)
// IsWALPath returns true if s is a path to a WAL file. // IsWALPath returns true if s is a path to a WAL file.
func IsWALPath(s string) bool { func IsWALPath(s string) bool {
@@ -254,7 +254,7 @@ func FormatWALPathWithOffset(index int, offset int64) string {
return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt) return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt)
} }
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.gz)?)$`) var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.lz4)?)$`)
// isHexChar returns true if ch is a lowercase hex character. // isHexChar returns true if ch is a lowercase hex character.
func isHexChar(ch rune) bool { func isHexChar(ch rune) bool {

View File

@@ -1,7 +1,6 @@
package litestream package litestream
import ( import (
"compress/gzip"
"context" "context"
"fmt" "fmt"
"io" "io"
@@ -15,6 +14,7 @@ import (
"time" "time"
"github.com/benbjohnson/litestream/internal" "github.com/benbjohnson/litestream/internal"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@@ -176,7 +176,7 @@ func (r *FileReplica) SnapshotDir(generation string) string {
// SnapshotPath returns the path to a snapshot file. // SnapshotPath returns the path to a snapshot file.
func (r *FileReplica) SnapshotPath(generation string, index int) string { func (r *FileReplica) SnapshotPath(generation string, index int) string {
return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index)) return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.lz4", index))
} }
// MaxSnapshotIndex returns the highest index for the snapshots. // MaxSnapshotIndex returns the highest index for the snapshots.
@@ -622,7 +622,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
} }
} }
// Gzip any old WAL files. // Compress any old WAL files.
if generation != "" { if generation != "" {
if err := r.compress(ctx, generation); err != nil { if err := r.compress(ctx, generation); err != nil {
return fmt.Errorf("cannot compress: %s", err) return fmt.Errorf("cannot compress: %s", err)
@@ -683,7 +683,7 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
return nil return nil
} }
// compress gzips all WAL files before the current one. // compress compresses all WAL files before the current one.
func (r *FileReplica) compress(ctx context.Context, generation string) error { func (r *FileReplica) compress(ctx context.Context, generation string) error {
filenames, err := filepath.Glob(filepath.Join(r.WALDir(generation), "**/*.wal")) filenames, err := filepath.Glob(filepath.Join(r.WALDir(generation), "**/*.wal"))
if err != nil { if err != nil {
@@ -704,7 +704,7 @@ func (r *FileReplica) compress(ctx context.Context, generation string) error {
default: default:
} }
dst := filename + ".gz" dst := filename + ".lz4"
if err := compressFile(filename, dst, r.db.uid, r.db.gid); err != nil { if err := compressFile(filename, dst, r.db.uid, r.db.gid); err != nil {
return err return err
} else if err := os.Remove(filename); err != nil { } else if err := os.Remove(filename); err != nil {
@@ -738,16 +738,11 @@ func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, ind
} else if ext == ".snapshot" { } else if ext == ".snapshot" {
return f, nil // not compressed, return as-is. return f, nil // not compressed, return as-is.
} }
assert(ext == ".snapshot.gz", "invalid snapshot extension") assert(ext == ".snapshot.lz4", "invalid snapshot extension")
// If compressed, wrap in a gzip reader and return with wrapper to // If compressed, wrap in an lz4 reader and return with wrapper to
// ensure that the underlying file is closed. // ensure that the underlying file is closed.
r, err := gzip.NewReader(f) return internal.NewReadCloser(lz4.NewReader(f), f), nil
if err != nil {
f.Close()
return nil, err
}
return internal.NewReadCloser(r, f), nil
} }
return nil, os.ErrNotExist return nil, os.ErrNotExist
} }
@@ -766,19 +761,14 @@ func (r *FileReplica) WALReader(ctx context.Context, generation string, index in
} }
// Otherwise read the compressed file. Return error if file doesn't exist. // Otherwise read the compressed file. Return error if file doesn't exist.
f, err = os.Open(filename + ".gz") f, err = os.Open(filename + ".lz4")
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If compressed, wrap in a gzip reader and return with wrapper to // If compressed, wrap in an lz4 reader and return with wrapper to
// ensure that the underlying file is closed. // ensure that the underlying file is closed.
rd, err := gzip.NewReader(f) return internal.NewReadCloser(lz4.NewReader(f), f), nil
if err != nil {
f.Close()
return nil, err
}
return internal.NewReadCloser(rd, f), nil
} }
// EnforceRetention forces a new snapshot once the retention interval has passed. // EnforceRetention forces a new snapshot once the retention interval has passed.
@@ -950,7 +940,7 @@ func WALIndexAt(ctx context.Context, r Replica, generation string, maxIndex int,
return index, nil return index, nil
} }
// compressFile compresses a file and replaces it with a new file with a .gz extension. // compressFile compresses a file and replaces it with a new file with a .lz4 extension.
func compressFile(src, dst string, uid, gid int) error { func compressFile(src, dst string, uid, gid int) error {
r, err := os.Open(src) r, err := os.Open(src)
if err != nil { if err != nil {
@@ -964,13 +954,13 @@ func compressFile(src, dst string, uid, gid int) error {
} }
defer w.Close() defer w.Close()
gz := gzip.NewWriter(w) zr := lz4.NewWriter(w)
defer gz.Close() defer zr.Close()
// Copy & compress file contents to temporary file. // Copy & compress file contents to temporary file.
if _, err := io.Copy(gz, r); err != nil { if _, err := io.Copy(zr, r); err != nil {
return err return err
} else if err := gz.Close(); err != nil { } else if err := zr.Close(); err != nil {
return err return err
} else if err := w.Sync(); err != nil { } else if err := w.Sync(); err != nil {
return err return err

View File

@@ -2,7 +2,6 @@ package s3
import ( import (
"bytes" "bytes"
"compress/gzip"
"context" "context"
"fmt" "fmt"
"io" "io"
@@ -20,6 +19,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal" "github.com/benbjohnson/litestream/internal"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
) )
@@ -155,7 +155,7 @@ func (r *Replica) SnapshotDir(generation string) string {
// SnapshotPath returns the path to a snapshot file. // SnapshotPath returns the path to a snapshot file.
func (r *Replica) SnapshotPath(generation string, index int) string { func (r *Replica) SnapshotPath(generation string, index int) string {
return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index)) return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.lz4", index))
} }
// MaxSnapshotIndex returns the highest index for the snapshots. // MaxSnapshotIndex returns the highest index for the snapshots.
@@ -588,13 +588,13 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
} }
pr, pw := io.Pipe() pr, pw := io.Pipe()
gw := gzip.NewWriter(pw) zw := lz4.NewWriter(pw)
go func() { go func() {
if _, err := io.Copy(gw, f); err != nil { if _, err := io.Copy(zw, f); err != nil {
_ = pw.CloseWithError(err) _ = pw.CloseWithError(err)
return return
} }
_ = pw.CloseWithError(gw.Close()) _ = pw.CloseWithError(zw.Close())
}() }()
snapshotPath := r.SnapshotPath(generation, index) snapshotPath := r.SnapshotPath(generation, index)
@@ -770,11 +770,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
} }
var buf bytes.Buffer var buf bytes.Buffer
gw := gzip.NewWriter(&buf) zw := lz4.NewWriter(&buf)
n, err := gw.Write(b) n, err := zw.Write(b)
if err != nil { if err != nil {
return err return err
} else if err := gw.Close(); err != nil { } else if err := zw.Close(); err != nil {
return err return err
} }
@@ -782,7 +782,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
// that files are contiguous without having to decompress. // that files are contiguous without having to decompress.
walPath := path.Join( walPath := path.Join(
r.WALDir(rd.Pos().Generation), r.WALDir(rd.Pos().Generation),
litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".gz", litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".lz4",
) )
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
@@ -826,12 +826,7 @@ func (r *Replica) SnapshotReader(ctx context.Context, generation string, index i
r.getOperationBytesCounter.Add(float64(*out.ContentLength)) r.getOperationBytesCounter.Add(float64(*out.ContentLength))
// Decompress the snapshot file. // Decompress the snapshot file.
gr, err := gzip.NewReader(out.Body) return internal.NewReadCloser(lz4.NewReader(out.Body), out.Body), nil
if err != nil {
out.Body.Close()
return nil, err
}
return internal.NewReadCloser(gr, out.Body), nil
} }
// WALReader returns a reader for WAL data at the given index. // WALReader returns a reader for WAL data at the given index.
@@ -886,13 +881,9 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
r.getOperationTotalCounter.Inc() r.getOperationTotalCounter.Inc()
r.getOperationTotalCounter.Add(float64(*out.ContentLength)) r.getOperationTotalCounter.Add(float64(*out.ContentLength))
gr, err := gzip.NewReader(out.Body) zr := lz4.NewReader(out.Body)
if err != nil {
return nil, err
}
defer gr.Close()
n, err := io.Copy(&buf, gr) n, err := io.Copy(&buf, zr)
if err != nil { if err != nil {
return nil, err return nil, err
} }