From 743aeb83e119e42bbd52278647799256d30eb726 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 15 Jan 2021 13:04:21 -0700 Subject: [PATCH] Revert gzip compression level, fix s3 wal upload --- replica.go | 2 +- s3/s3.go | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/replica.go b/replica.go index 58df78a..6eadf5d 100644 --- a/replica.go +++ b/replica.go @@ -915,7 +915,7 @@ func compressFile(src, dst string, uid, gid int) error { } defer w.Close() - gz, _ := gzip.NewWriterLevel(w, gzip.BestSpeed) + gz := gzip.NewWriter(w) defer gz.Close() // Copy & compress file contents to temporary file. diff --git a/s3/s3.go b/s3/s3.go index f7bf543..6379e28 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -552,7 +552,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er } pr, pw := io.Pipe() - gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + gw := gzip.NewWriter(pw) go func() { if _, err := io.Copy(gw, f); err != nil { _ = pw.CloseWithError(err) @@ -723,9 +723,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { } var buf bytes.Buffer - gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) + gw := gzip.NewWriter(&buf) if _, err := gw.Write(b); err != nil { return err + } else if err := gw.Close(); err != nil { + return err } // Build a WAL path with the index/offset as well as size so we can ensure @@ -823,7 +825,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( } // Open each file and concatenate into a multi-reader. - var mrc multiReadCloser + var buf bytes.Buffer for _, key := range keys { // Pipe download to return an io.Reader. out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ @@ -831,24 +833,25 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( Key: aws.String(key), }) if err != nil { - mrc.Close() return nil, err } + defer out.Body.Close() + r.getOperationTotalCounter.Inc() r.getOperationTotalCounter.Add(float64(*out.ContentLength)) - // Decompress the snapshot file. gr, err := gzip.NewReader(out.Body) if err != nil { - out.Body.Close() - mrc.Close() return nil, err } + defer gr.Close() - mrc.readers = append(mrc.readers, internal.NewReadCloser(gr, out.Body)) + if _, err := io.Copy(&buf, gr); err != nil { + return nil, err + } } - return &mrc, nil + return ioutil.NopCloser(&buf), nil } // EnforceRetention forces a new snapshot once the retention interval has passed.