Revert gzip compression level, fix s3 wal upload

This commit is contained in:
Ben Johnson
2021-01-15 13:04:21 -07:00
parent a7ec05ad7a
commit 743aeb83e1
2 changed files with 13 additions and 10 deletions

View File

@@ -915,7 +915,7 @@ func compressFile(src, dst string, uid, gid int) error {
} }
defer w.Close() defer w.Close()
gz, _ := gzip.NewWriterLevel(w, gzip.BestSpeed) gz := gzip.NewWriter(w)
defer gz.Close() defer gz.Close()
// Copy & compress file contents to temporary file. // Copy & compress file contents to temporary file.

View File

@@ -552,7 +552,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
} }
pr, pw := io.Pipe() pr, pw := io.Pipe()
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) gw := gzip.NewWriter(pw)
go func() { go func() {
if _, err := io.Copy(gw, f); err != nil { if _, err := io.Copy(gw, f); err != nil {
_ = pw.CloseWithError(err) _ = pw.CloseWithError(err)
@@ -723,9 +723,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
} }
var buf bytes.Buffer var buf bytes.Buffer
gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) gw := gzip.NewWriter(&buf)
if _, err := gw.Write(b); err != nil { if _, err := gw.Write(b); err != nil {
return err return err
} else if err := gw.Close(); err != nil {
return err
} }
// Build a WAL path with the index/offset as well as size so we can ensure // Build a WAL path with the index/offset as well as size so we can ensure
@@ -823,7 +825,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
} }
// Open each file and concatenate into a multi-reader. // Open each file and concatenate into a multi-reader.
var mrc multiReadCloser var buf bytes.Buffer
for _, key := range keys { for _, key := range keys {
// Pipe download to return an io.Reader. // Pipe download to return an io.Reader.
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
@@ -831,24 +833,25 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
Key: aws.String(key), Key: aws.String(key),
}) })
if err != nil { if err != nil {
mrc.Close()
return nil, err return nil, err
} }
defer out.Body.Close()
r.getOperationTotalCounter.Inc() r.getOperationTotalCounter.Inc()
r.getOperationTotalCounter.Add(float64(*out.ContentLength)) r.getOperationTotalCounter.Add(float64(*out.ContentLength))
// Decompress the snapshot file.
gr, err := gzip.NewReader(out.Body) gr, err := gzip.NewReader(out.Body)
if err != nil { if err != nil {
out.Body.Close()
mrc.Close()
return nil, err return nil, err
} }
defer gr.Close()
mrc.readers = append(mrc.readers, internal.NewReadCloser(gr, out.Body)) if _, err := io.Copy(&buf, gr); err != nil {
return nil, err
}
} }
return &mrc, nil return ioutil.NopCloser(&buf), nil
} }
// EnforceRetention forces a new snapshot once the retention interval has passed. // EnforceRetention forces a new snapshot once the retention interval has passed.