From 90a1d959d45573a746427643288c0eb6feced8b7 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 17 Jan 2021 10:02:06 -0700 Subject: [PATCH] Remove size from s3 filenames --- db.go | 8 ++++---- litestream.go | 16 +++++++--------- replica.go | 6 +++--- s3/s3.go | 36 +++++++++++++++++------------------- 4 files changed, 31 insertions(+), 35 deletions(-) diff --git a/db.go b/db.go index 321adb7..0274b7b 100644 --- a/db.go +++ b/db.go @@ -188,7 +188,7 @@ func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, e if !strings.HasSuffix(fi.Name(), WALExt) { continue } - if v, _, _, _, err := ParseWALPath(fi.Name()); err != nil { + if v, _, _, err := ParseWALPath(fi.Name()); err != nil { continue // invalid wal filename } else if v > index { index = v @@ -488,7 +488,7 @@ func (db *DB) cleanWAL() error { return err } for _, fi := range fis { - if idx, _, _, _, err := ParseWALPath(fi.Name()); err != nil || idx >= min { + if idx, _, _, err := ParseWALPath(fi.Name()); err != nil || idx >= min { continue } if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { @@ -868,7 +868,7 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) { // Parse index of current shadow WAL file. dir, base := filepath.Split(info.shadowWALPath) - index, _, _, _, err := ParseWALPath(base) + index, _, _, err := ParseWALPath(base) if err != nil { return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) } @@ -1227,7 +1227,7 @@ func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error { } // Parse index of current shadow WAL file. - index, _, _, _, err := ParseWALPath(shadowWALPath) + index, _, _, err := ParseWALPath(shadowWALPath) if err != nil { return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath) } diff --git a/litestream.go b/litestream.go index 32ddc16..0321124 100644 --- a/litestream.go +++ b/litestream.go @@ -228,18 +228,17 @@ func IsWALPath(s string) bool { // ParseWALPath returns the index & offset for the WAL file. // Returns an error if the path is not a valid snapshot path. -func ParseWALPath(s string) (index int, offset, sz int64, ext string, err error) { +func ParseWALPath(s string) (index int, offset int64, ext string, err error) { s = filepath.Base(s) a := walPathRegex.FindStringSubmatch(s) if a == nil { - return 0, 0, 0, "", fmt.Errorf("invalid wal path: %s", s) + return 0, 0, "", fmt.Errorf("invalid wal path: %s", s) } i64, _ := strconv.ParseUint(a[1], 16, 64) off64, _ := strconv.ParseUint(a[2], 16, 64) - sz64, _ := strconv.ParseUint(a[3], 16, 64) - return int(i64), int64(off64), int64(sz64), a[4], nil + return int(i64), int64(off64), a[3], nil } // FormatWALPath formats a WAL filename with a given index. @@ -248,15 +247,14 @@ func FormatWALPath(index int) string { return fmt.Sprintf("%08x%s", index, WALExt) } -// FormatWALPathWithOffsetSize formats a WAL filename with a given index, offset & size. -func FormatWALPathWithOffsetSize(index int, offset, sz int64) string { +// FormatWALPathWithOffset formats a WAL filename with a given index & offset. +func FormatWALPathWithOffset(index int, offset int64) string { assert(index >= 0, "wal index must be non-negative") assert(offset >= 0, "wal offset must be non-negative") - assert(sz >= 0, "wal size must be non-negative") - return fmt.Sprintf("%08x_%08x_%08x%s", index, offset, sz, WALExt) + return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt) } -var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8})_([0-9a-f]{8}))?(.wal(?:.gz)?)$`) +var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.gz)?)$`) // isHexChar returns true if ch is a lowercase hex character. func isHexChar(ch rune) bool { diff --git a/replica.go b/replica.go index 6914ed9..af7135b 100644 --- a/replica.go +++ b/replica.go @@ -365,7 +365,7 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) { // Iterate over each WAL file. for _, fi := range fis { - index, offset, _, _, err := ParseWALPath(fi.Name()) + index, offset, _, err := ParseWALPath(fi.Name()) if err != nil { continue } @@ -507,7 +507,7 @@ func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, index := -1 for _, fi := range fis { - if idx, _, _, _, err := ParseWALPath(fi.Name()); err != nil { + if idx, _, _, err := ParseWALPath(fi.Name()); err != nil { continue // invalid wal filename } else if index == -1 || idx > index { index = idx @@ -878,7 +878,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation } for _, fi := range fis { - idx, _, _, _, err := ParseWALPath(fi.Name()) + idx, _, _, err := ParseWALPath(fi.Name()) if err != nil { continue } else if idx >= index { diff --git a/s3/s3.go b/s3/s3.go index 6a11ecd..f2f3b16 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -370,7 +370,7 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) { for _, obj := range page.Contents { key := path.Base(*obj.Key) - index, offset, _, _, err := litestream.ParseWALPath(key) + index, offset, _, err := litestream.ParseWALPath(key) if err != nil { continue } @@ -539,15 +539,15 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea for _, obj := range page.Contents { key := path.Base(*obj.Key) - idx, offset, sz, _, err := litestream.ParseWALPath(key) + idx, off, _, err := litestream.ParseWALPath(key) if err != nil { continue // invalid wal filename } if index == -1 || idx > index { index, offset = idx, 0 // start tracking new wal - } else if idx == index { - offset += sz // append additional size to wal + } else if idx == index && off > offset { + offset = off // update offset } } return true @@ -782,7 +782,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { // that files are contiguous without having to decompress. walPath := path.Join( r.WALDir(rd.Pos().Generation), - litestream.FormatWALPathWithOffsetSize(pos.Index, pos.Offset, int64(len(b)))+".gz", + litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".gz", ) if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ @@ -843,8 +843,6 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( // Collect all files for the index. var keys []string - var offset int64 - var innerErr error if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))), @@ -852,29 +850,27 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( r.listOperationTotalCounter.Inc() for _, obj := range page.Contents { - // Read the offset & size from the filename. We need to check this - // against a running offset to ensure there are no gaps. - _, off, sz, _, err := litestream.ParseWALPath(path.Base(*obj.Key)) + _, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)) if err != nil { continue - } else if off != offset { - innerErr = fmt.Errorf("out of sequence wal segments: %s/%08x", generation, index) - return false } - keys = append(keys, *obj.Key) - offset += sz } return true }); err != nil { return nil, err - } else if innerErr != nil { - return nil, innerErr } // Open each file and concatenate into a multi-reader. var buf bytes.Buffer + var offset int64 for _, key := range keys { + // Ensure offset is correct as we copy segments into buffer. + _, off, _, _ := litestream.ParseWALPath(path.Base(key)) + if off != offset { + return nil, fmt.Errorf("out of sequence wal segments: %s/%08x at remote offset %d, expected offset %d", generation, index, off, offset) + } + // Pipe download to return an io.Reader. out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ Bucket: aws.String(r.Bucket), @@ -894,9 +890,11 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( } defer gr.Close() - if _, err := io.Copy(&buf, gr); err != nil { + n, err := io.Copy(&buf, gr) + if err != nil { return nil, err } + offset = int64(n) } return ioutil.NopCloser(&buf), nil @@ -984,7 +982,7 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, if index != -1 { if idx, _, err := litestream.ParseSnapshotPath(path.Base(*obj.Key)); err == nil && idx >= index { continue - } else if idx, _, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)); err == nil && idx >= index { + } else if idx, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)); err == nil && idx >= index { continue } }