Remove size from s3 filenames
This commit is contained in:
8
db.go
8
db.go
@@ -188,7 +188,7 @@ func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, e
|
|||||||
if !strings.HasSuffix(fi.Name(), WALExt) {
|
if !strings.HasSuffix(fi.Name(), WALExt) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if v, _, _, _, err := ParseWALPath(fi.Name()); err != nil {
|
if v, _, _, err := ParseWALPath(fi.Name()); err != nil {
|
||||||
continue // invalid wal filename
|
continue // invalid wal filename
|
||||||
} else if v > index {
|
} else if v > index {
|
||||||
index = v
|
index = v
|
||||||
@@ -488,7 +488,7 @@ func (db *DB) cleanWAL() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
if idx, _, _, _, err := ParseWALPath(fi.Name()); err != nil || idx >= min {
|
if idx, _, _, err := ParseWALPath(fi.Name()); err != nil || idx >= min {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
||||||
@@ -868,7 +868,7 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
|
|||||||
|
|
||||||
// Parse index of current shadow WAL file.
|
// Parse index of current shadow WAL file.
|
||||||
dir, base := filepath.Split(info.shadowWALPath)
|
dir, base := filepath.Split(info.shadowWALPath)
|
||||||
index, _, _, _, err := ParseWALPath(base)
|
index, _, _, err := ParseWALPath(base)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
||||||
}
|
}
|
||||||
@@ -1227,7 +1227,7 @@ func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Parse index of current shadow WAL file.
|
// Parse index of current shadow WAL file.
|
||||||
index, _, _, _, err := ParseWALPath(shadowWALPath)
|
index, _, _, err := ParseWALPath(shadowWALPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath)
|
return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -228,18 +228,17 @@ func IsWALPath(s string) bool {
|
|||||||
|
|
||||||
// ParseWALPath returns the index & offset for the WAL file.
|
// ParseWALPath returns the index & offset for the WAL file.
|
||||||
// Returns an error if the path is not a valid snapshot path.
|
// Returns an error if the path is not a valid snapshot path.
|
||||||
func ParseWALPath(s string) (index int, offset, sz int64, ext string, err error) {
|
func ParseWALPath(s string) (index int, offset int64, ext string, err error) {
|
||||||
s = filepath.Base(s)
|
s = filepath.Base(s)
|
||||||
|
|
||||||
a := walPathRegex.FindStringSubmatch(s)
|
a := walPathRegex.FindStringSubmatch(s)
|
||||||
if a == nil {
|
if a == nil {
|
||||||
return 0, 0, 0, "", fmt.Errorf("invalid wal path: %s", s)
|
return 0, 0, "", fmt.Errorf("invalid wal path: %s", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
i64, _ := strconv.ParseUint(a[1], 16, 64)
|
i64, _ := strconv.ParseUint(a[1], 16, 64)
|
||||||
off64, _ := strconv.ParseUint(a[2], 16, 64)
|
off64, _ := strconv.ParseUint(a[2], 16, 64)
|
||||||
sz64, _ := strconv.ParseUint(a[3], 16, 64)
|
return int(i64), int64(off64), a[3], nil
|
||||||
return int(i64), int64(off64), int64(sz64), a[4], nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FormatWALPath formats a WAL filename with a given index.
|
// FormatWALPath formats a WAL filename with a given index.
|
||||||
@@ -248,15 +247,14 @@ func FormatWALPath(index int) string {
|
|||||||
return fmt.Sprintf("%08x%s", index, WALExt)
|
return fmt.Sprintf("%08x%s", index, WALExt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FormatWALPathWithOffsetSize formats a WAL filename with a given index, offset & size.
|
// FormatWALPathWithOffset formats a WAL filename with a given index & offset.
|
||||||
func FormatWALPathWithOffsetSize(index int, offset, sz int64) string {
|
func FormatWALPathWithOffset(index int, offset int64) string {
|
||||||
assert(index >= 0, "wal index must be non-negative")
|
assert(index >= 0, "wal index must be non-negative")
|
||||||
assert(offset >= 0, "wal offset must be non-negative")
|
assert(offset >= 0, "wal offset must be non-negative")
|
||||||
assert(sz >= 0, "wal size must be non-negative")
|
return fmt.Sprintf("%08x_%08x%s", index, offset, WALExt)
|
||||||
return fmt.Sprintf("%08x_%08x_%08x%s", index, offset, sz, WALExt)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8})_([0-9a-f]{8}))?(.wal(?:.gz)?)$`)
|
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))?(.wal(?:.gz)?)$`)
|
||||||
|
|
||||||
// isHexChar returns true if ch is a lowercase hex character.
|
// isHexChar returns true if ch is a lowercase hex character.
|
||||||
func isHexChar(ch rune) bool {
|
func isHexChar(ch rune) bool {
|
||||||
|
|||||||
@@ -365,7 +365,7 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
|
|||||||
|
|
||||||
// Iterate over each WAL file.
|
// Iterate over each WAL file.
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
index, offset, _, _, err := ParseWALPath(fi.Name())
|
index, offset, _, err := ParseWALPath(fi.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -507,7 +507,7 @@ func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos,
|
|||||||
|
|
||||||
index := -1
|
index := -1
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
if idx, _, _, _, err := ParseWALPath(fi.Name()); err != nil {
|
if idx, _, _, err := ParseWALPath(fi.Name()); err != nil {
|
||||||
continue // invalid wal filename
|
continue // invalid wal filename
|
||||||
} else if index == -1 || idx > index {
|
} else if index == -1 || idx > index {
|
||||||
index = idx
|
index = idx
|
||||||
@@ -878,7 +878,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
idx, _, _, _, err := ParseWALPath(fi.Name())
|
idx, _, _, err := ParseWALPath(fi.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
} else if idx >= index {
|
} else if idx >= index {
|
||||||
|
|||||||
36
s3/s3.go
36
s3/s3.go
@@ -370,7 +370,7 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) {
|
|||||||
for _, obj := range page.Contents {
|
for _, obj := range page.Contents {
|
||||||
key := path.Base(*obj.Key)
|
key := path.Base(*obj.Key)
|
||||||
|
|
||||||
index, offset, _, _, err := litestream.ParseWALPath(key)
|
index, offset, _, err := litestream.ParseWALPath(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -539,15 +539,15 @@ func (r *Replica) CalcPos(ctx context.Context, generation string) (pos litestrea
|
|||||||
for _, obj := range page.Contents {
|
for _, obj := range page.Contents {
|
||||||
key := path.Base(*obj.Key)
|
key := path.Base(*obj.Key)
|
||||||
|
|
||||||
idx, offset, sz, _, err := litestream.ParseWALPath(key)
|
idx, off, _, err := litestream.ParseWALPath(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue // invalid wal filename
|
continue // invalid wal filename
|
||||||
}
|
}
|
||||||
|
|
||||||
if index == -1 || idx > index {
|
if index == -1 || idx > index {
|
||||||
index, offset = idx, 0 // start tracking new wal
|
index, offset = idx, 0 // start tracking new wal
|
||||||
} else if idx == index {
|
} else if idx == index && off > offset {
|
||||||
offset += sz // append additional size to wal
|
offset = off // update offset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
@@ -782,7 +782,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
|||||||
// that files are contiguous without having to decompress.
|
// that files are contiguous without having to decompress.
|
||||||
walPath := path.Join(
|
walPath := path.Join(
|
||||||
r.WALDir(rd.Pos().Generation),
|
r.WALDir(rd.Pos().Generation),
|
||||||
litestream.FormatWALPathWithOffsetSize(pos.Index, pos.Offset, int64(len(b)))+".gz",
|
litestream.FormatWALPathWithOffset(pos.Index, pos.Offset)+".gz",
|
||||||
)
|
)
|
||||||
|
|
||||||
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
||||||
@@ -843,8 +843,6 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
|
|||||||
|
|
||||||
// Collect all files for the index.
|
// Collect all files for the index.
|
||||||
var keys []string
|
var keys []string
|
||||||
var offset int64
|
|
||||||
var innerErr error
|
|
||||||
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
|
||||||
Bucket: aws.String(r.Bucket),
|
Bucket: aws.String(r.Bucket),
|
||||||
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))),
|
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))),
|
||||||
@@ -852,29 +850,27 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
|
|||||||
r.listOperationTotalCounter.Inc()
|
r.listOperationTotalCounter.Inc()
|
||||||
|
|
||||||
for _, obj := range page.Contents {
|
for _, obj := range page.Contents {
|
||||||
// Read the offset & size from the filename. We need to check this
|
_, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key))
|
||||||
// against a running offset to ensure there are no gaps.
|
|
||||||
_, off, sz, _, err := litestream.ParseWALPath(path.Base(*obj.Key))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
} else if off != offset {
|
|
||||||
innerErr = fmt.Errorf("out of sequence wal segments: %s/%08x", generation, index)
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
keys = append(keys, *obj.Key)
|
keys = append(keys, *obj.Key)
|
||||||
offset += sz
|
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if innerErr != nil {
|
|
||||||
return nil, innerErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open each file and concatenate into a multi-reader.
|
// Open each file and concatenate into a multi-reader.
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
|
var offset int64
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
|
// Ensure offset is correct as we copy segments into buffer.
|
||||||
|
_, off, _, _ := litestream.ParseWALPath(path.Base(key))
|
||||||
|
if off != offset {
|
||||||
|
return nil, fmt.Errorf("out of sequence wal segments: %s/%08x at remote offset %d, expected offset %d", generation, index, off, offset)
|
||||||
|
}
|
||||||
|
|
||||||
// Pipe download to return an io.Reader.
|
// Pipe download to return an io.Reader.
|
||||||
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
|
out, err := r.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
|
||||||
Bucket: aws.String(r.Bucket),
|
Bucket: aws.String(r.Bucket),
|
||||||
@@ -894,9 +890,11 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
|
|||||||
}
|
}
|
||||||
defer gr.Close()
|
defer gr.Close()
|
||||||
|
|
||||||
if _, err := io.Copy(&buf, gr); err != nil {
|
n, err := io.Copy(&buf, gr)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
offset = int64(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ioutil.NopCloser(&buf), nil
|
return ioutil.NopCloser(&buf), nil
|
||||||
@@ -984,7 +982,7 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
|
|||||||
if index != -1 {
|
if index != -1 {
|
||||||
if idx, _, err := litestream.ParseSnapshotPath(path.Base(*obj.Key)); err == nil && idx >= index {
|
if idx, _, err := litestream.ParseSnapshotPath(path.Base(*obj.Key)); err == nil && idx >= index {
|
||||||
continue
|
continue
|
||||||
} else if idx, _, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)); err == nil && idx >= index {
|
} else if idx, _, _, err := litestream.ParseWALPath(path.Base(*obj.Key)); err == nil && idx >= index {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user