From e1c9e09161ce3326f2e6fc65cf0db190c3383fce Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 14 Jan 2021 15:26:29 -0700 Subject: [PATCH] Update wal segment naming --- cmd/litestream/generations.go | 1 + db.go | 4 ++-- litestream.go | 8 ++++---- replica.go | 14 ++++++++------ s3/s3.go | 25 ++++++++++++------------- 5 files changed, 27 insertions(+), 25 deletions(-) diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index d90f78f..6dfb1a7 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -88,6 +88,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) stats.CreatedAt.Format(time.RFC3339), stats.UpdatedAt.Format(time.RFC3339), ) + w.Flush() } } w.Flush() diff --git a/db.go b/db.go index d84b899..38bb50c 100644 --- a/db.go +++ b/db.go @@ -1313,7 +1313,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { tmpPath := outputPath + ".tmp" // Copy snapshot to output path. - logger.Printf("restoring snapshot from %s://%s/%016x to %s", r.Name(), generation, minWALIndex, tmpPath) + logger.Printf("restoring snapshot from replica %q, generation %q, index %08x to %s", r.Name(), generation, minWALIndex, tmpPath) if !opt.DryRun { if err := db.restoreSnapshot(ctx, r, pos.Generation, pos.Index, tmpPath); err != nil { return fmt.Errorf("cannot restore snapshot: %w", err) @@ -1322,7 +1322,7 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { // Restore each WAL file until we reach our maximum index. for index := minWALIndex; index <= maxWALIndex; index++ { - logger.Printf("restoring wal from %s://%s/%016x to %s-wal", r.Name(), generation, index, tmpPath) + logger.Printf("restoring wal from replica %q, generation %q, index %08x to %s-wal", r.Name(), generation, index, tmpPath) if opt.DryRun { continue } diff --git a/litestream.go b/litestream.go index 4b76696..2c0770e 100644 --- a/litestream.go +++ b/litestream.go @@ -219,7 +219,7 @@ func ParseSnapshotPath(s string) (index int, ext string, err error) { return int(i64), a[2], nil } -var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(.snapshot(?:.gz)?)$`) +var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(.snapshot(?:.gz)?)$`) // IsWALPath returns true if s is a path to a WAL file. func IsWALPath(s string) bool { @@ -245,7 +245,7 @@ func ParseWALPath(s string) (index int, offset, sz int64, ext string, err error) // FormatWALPath formats a WAL filename with a given index. func FormatWALPath(index int) string { assert(index >= 0, "wal index must be non-negative") - return fmt.Sprintf("%016x%s", index, WALExt) + return fmt.Sprintf("%08x%s", index, WALExt) } // FormatWALPathWithOffsetSize formats a WAL filename with a given index, offset & size. @@ -253,10 +253,10 @@ func FormatWALPathWithOffsetSize(index int, offset, sz int64) string { assert(index >= 0, "wal index must be non-negative") assert(offset >= 0, "wal offset must be non-negative") assert(sz >= 0, "wal size must be non-negative") - return fmt.Sprintf("%016x_%016x_%016x%s", index, offset, sz, WALExt) + return fmt.Sprintf("%08x_%08x_%08x%s", index, offset, sz, WALExt) } -var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16})_([0-9a-f]{16}))?(.wal(?:.gz)?)$`) +var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8})_([0-9a-f]{8}))?(.wal(?:.gz)?)$`) // isHexChar returns true if ch is a lowercase hex character. func isHexChar(ch rune) bool { diff --git a/replica.go b/replica.go index 87f8758..9f9932d 100644 --- a/replica.go +++ b/replica.go @@ -70,7 +70,8 @@ type GenerationStats struct { // Default file replica settings. const ( - DefaultRetention = 24 * time.Hour + DefaultRetention = 24 * time.Hour + DefaultRetentionCheckInterval = 1 * time.Hour ) var _ Replica = (*FileReplica)(nil) @@ -108,8 +109,9 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica { dst: dst, cancel: func() {}, - Retention: DefaultRetention, - MonitorEnabled: true, + Retention: DefaultRetention, + RetentionCheckInterval: DefaultRetentionCheckInterval, + MonitorEnabled: true, } } @@ -145,7 +147,7 @@ func (r *FileReplica) SnapshotDir(generation string) string { // SnapshotPath returns the path to a snapshot file. func (r *FileReplica) SnapshotPath(generation string, index int) string { - return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index)) + return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index)) } // MaxSnapshotIndex returns the highest index for the snapshots. @@ -176,7 +178,7 @@ func (r *FileReplica) WALDir(generation string) string { // WALPath returns the path to a WAL file. func (r *FileReplica) WALPath(generation string, index int) string { - return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index)) + return filepath.Join(r.WALDir(generation), fmt.Sprintf("%08x.wal", index)) } // Generations returns a list of available generation names. @@ -776,7 +778,7 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener continue } - log.Printf("%s(%s): generation %q snapshot no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name()) + log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name()) if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { return err } diff --git a/s3/s3.go b/s3/s3.go index f866eca..9614221 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -121,7 +121,7 @@ func (r *Replica) SnapshotDir(generation string) string { // SnapshotPath returns the path to a snapshot file. func (r *Replica) SnapshotPath(generation string, index int) string { - return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index)) + return path.Join(r.SnapshotDir(generation), fmt.Sprintf("%08x.snapshot.gz", index)) } // MaxSnapshotIndex returns the highest index for the snapshots. @@ -211,12 +211,11 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) { if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ - Bucket: aws.String(r.Bucket), - Prefix: aws.String(r.SnapshotDir(generation) + "/"), - Delimiter: aws.String("/"), + Bucket: aws.String(r.Bucket), + Prefix: aws.String(r.SnapshotDir(generation) + "/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, obj := range page.Contents { - if !litestream.IsSnapshotPath(*obj.Key) { + if !litestream.IsSnapshotPath(path.Base(*obj.Key)) { continue } modTime := obj.LastModified.UTC() @@ -238,12 +237,11 @@ func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) { if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ - Bucket: aws.String(r.Bucket), - Prefix: aws.String(r.WALDir(generation) + "/"), - Delimiter: aws.String("/"), + Bucket: aws.String(r.Bucket), + Prefix: aws.String(r.WALDir(generation) + "/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, obj := range page.Contents { - if !litestream.IsWALPath(*obj.Key) { + if !litestream.IsWALPath(path.Base(*obj.Key)) { continue } modTime := obj.LastModified.UTC() @@ -637,6 +635,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { defer rd.Close() // Read to intermediate buffer to determine size. + pos := rd.Pos() b, err := ioutil.ReadAll(rd) if err != nil { return err @@ -652,7 +651,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { // that files are contiguous without having to decompress. walPath := path.Join( r.WALDir(rd.Pos().Generation), - litestream.FormatWALPathWithOffsetSize(rd.Pos().Index, rd.Pos().Offset, int64(len(b)))+".gz", + litestream.FormatWALPathWithOffsetSize(pos.Index, pos.Offset, int64(len(b)))+".gz", ) if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ @@ -708,7 +707,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( var innerErr error if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), - Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%016x_", index))), + Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%08x_", index))), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, obj := range page.Contents { // Read the offset & size from the filename. We need to check this @@ -717,7 +716,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( if err != nil { continue } else if off != offset { - innerErr = fmt.Errorf("out of sequence wal segments: %s/%016x", generation, index) + innerErr = fmt.Errorf("out of sequence wal segments: %s/%08x", generation, index) return false } @@ -858,7 +857,7 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, } for _, objID := range objIDs[i:j] { - log.Printf("%s(%s): generation %q file no longer retained, deleting %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key)) + log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key)) } if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{