Update wal segment naming

This commit is contained in:
Ben Johnson
2021-01-14 15:26:29 -07:00
parent 1e4e9633cc
commit e1c9e09161
5 changed files with 27 additions and 25 deletions

View File

@@ -121,7 +121,7 @@ func (r *Replica) SnapshotDir(generation string) string {
// SnapshotPath returns the path to a snapshot file.
func (r *Replica) SnapshotPath(generation string, index int) string {
return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index))
return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index))
}
// MaxSnapshotIndex returns the highest index for the snapshots.
@@ -211,12 +211,11 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats
func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
Delimiter: aws.String("/"),
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
for _, obj := range page.Contents {
if !litestream.IsSnapshotPath(*obj.Key) {
if !litestream.IsSnapshotPath(path.Base(*obj.Key)) {
continue
}
modTime := obj.LastModified.UTC()
@@ -238,12 +237,11 @@ func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int,
func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"),
Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
for _, obj := range page.Contents {
if !litestream.IsWALPath(*obj.Key) {
if !litestream.IsWALPath(path.Base(*obj.Key)) {
continue
}
modTime := obj.LastModified.UTC()
@@ -637,6 +635,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
defer rd.Close()
// Read to intermediate buffer to determine size.
pos := rd.Pos()
b, err := ioutil.ReadAll(rd)
if err != nil {
return err
@@ -652,7 +651,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
// that files are contiguous without having to decompress.
walPath := path.Join(
r.WALDir(rd.Pos().Generation),
litestream.FormatWALPathWithOffsetSize(rd.Pos().Index, rd.Pos().Offset, int64(len(b)))+".gz",
litestream.FormatWALPathWithOffsetSize(pos.Index, pos.Offset, int64(len(b)))+".gz",
)
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
@@ -708,7 +707,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
var innerErr error
if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%016x_", index))),
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool {
for _, obj := range page.Contents {
// Read the offset & size from the filename. We need to check this
@@ -717,7 +716,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
if err != nil {
continue
} else if off != offset {
innerErr = fmt.Errorf("out of sequence wal segments: %s/%016x", generation, index)
innerErr = fmt.Errorf("out of sequence wal segments: %s/%08x", generation, index)
return false
}
@@ -858,7 +857,7 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
}
for _, objID := range objIDs[i:j] {
log.Printf("%s(%s): generation %q file no longer retained, deleting %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
}
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{