From fc897b481fae32be0b3a6e61e64b221a0df89cc5 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 14 Jun 2021 15:19:27 -0600 Subject: [PATCH] Group replica wal segments by index This commit changes the replica path format to group segments within a single index in the same directory. This is to eventually add the ability to seek to a record on file-based systems without having to iterate over the records. The DB shadow WAL will also be changed to this same format to support live replicas. --- abs/replica_client.go | 77 +++++++-------- db.go | 40 ++++++-- file/replica_client.go | 135 +++++++++++++++++++++----- file/replica_client_test.go | 2 +- gcs/replica_client.go | 78 +++++++-------- internal/internal.go | 30 ++++++ internal/internal_test.go | 61 ++++++++++++ litestream.go | 138 ++++---------------------- litestream_test.go | 88 ----------------- replica_client_test.go | 16 +-- s3/replica_client.go | 78 +++++++-------- sftp/replica_client.go | 187 ++++++++++++++++++++++++++---------- 12 files changed, 504 insertions(+), 426 deletions(-) create mode 100644 internal/internal_test.go diff --git a/abs/replica_client.go b/abs/replica_client.go index 4d5e00e..551f638 100644 --- a/abs/replica_client.go +++ b/abs/replica_client.go @@ -102,7 +102,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{ - Prefix: litestream.GenerationsPath(c.Path) + "/", + Prefix: path.Join(c.Path, "generations") + "/", }) if err != nil { return nil, err @@ -125,18 +125,17 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error { if err := c.Init(ctx); err != nil { return err + } else if generation == "" { + return fmt.Errorf("generation required") } - dir, err := litestream.GenerationPath(c.Path, generation) - if err != nil { - return fmt.Errorf("cannot determine generation path: %w", err) - } + prefix := path.Join(c.Path, "generations", generation) + "/" var marker azblob.Marker for marker.NotDone() { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() - resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) + resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix}) if err != nil { return err } @@ -171,12 +170,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { if err := c.Init(ctx); err != nil { return info, err + } else if generation == "" { + return info, fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") startTime := time.Now() rc := internal.NewReadCounter(rd) @@ -206,12 +204,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return nil, fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") blobURL := c.containerURL.NewBlobURL(key) resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) @@ -231,12 +228,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { if err := c.Init(ctx); err != nil { return err + } else if generation == "" { + return fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() @@ -261,12 +257,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { if err := c.Init(ctx); err != nil { return info, err + } else if pos.Generation == "" { + return info, fmt.Errorf("generation required") } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) - } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") startTime := time.Now() rc := internal.NewReadCounter(rd) @@ -296,12 +291,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if pos.Generation == "" { + return nil, fmt.Errorf("generation required") } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return nil, fmt.Errorf("cannot determine wal segment path: %w", err) - } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") blobURL := c.containerURL.NewBlobURL(key) resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) @@ -324,11 +318,12 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } for _, pos := range a { - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return fmt.Errorf("cannot determine wal segment path: %w", err) + if pos.Generation == "" { + return fmt.Errorf("generation required") } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() blobURL := c.containerURL.NewBlobURL(key) @@ -372,24 +367,24 @@ func newSnapshotIterator(ctx context.Context, generation string, client *Replica func (itr *snapshotIterator) fetch() error { defer close(itr.ch) - dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation) - if err != nil { - return fmt.Errorf("cannot determine snapshots path: %w", err) + if itr.generation == "" { + return fmt.Errorf("generation required") } + prefix := path.Join(itr.client.Path, "generations", itr.generation) + "/" + var marker azblob.Marker for marker.NotDone() { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() - resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) + resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix}) if err != nil { return err } marker = resp.NextMarker for _, item := range resp.Segment.BlobItems { - key := path.Base(item.Name) - index, err := litestream.ParseSnapshotPath(key) + index, err := internal.ParseSnapshotPath(path.Base(item.Name)) if err != nil { continue } @@ -478,24 +473,24 @@ func newWALSegmentIterator(ctx context.Context, generation string, client *Repli func (itr *walSegmentIterator) fetch() error { defer close(itr.ch) - dir, err := litestream.WALPath(itr.client.Path, itr.generation) - if err != nil { - return fmt.Errorf("cannot determine wal path: %w", err) + if itr.generation == "" { + return fmt.Errorf("generation required") } + prefix := path.Join(itr.client.Path, "generations", itr.generation, "wal") var marker azblob.Marker for marker.NotDone() { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() - resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) + resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix}) if err != nil { return err } marker = resp.NextMarker for _, item := range resp.Segment.BlobItems { - key := path.Base(item.Name) - index, offset, err := litestream.ParseWALSegmentPath(key) + key := strings.TrimPrefix(item.Name, prefix+"/") + index, offset, err := internal.ParseWALSegmentPath(key) if err != nil { continue } diff --git a/db.go b/db.go index dd33d7e..682abec 100644 --- a/db.go +++ b/db.go @@ -16,6 +16,8 @@ import ( "math/rand" "os" "path/filepath" + "regexp" + "strconv" "strings" "sync" "time" @@ -168,7 +170,7 @@ func (db *DB) ShadowWALDir(generation string) string { // Panics if generation is blank or index is negative. func (db *DB) ShadowWALPath(generation string, index int) string { assert(index >= 0, "shadow wal index cannot be negative") - return filepath.Join(db.ShadowWALDir(generation), FormatWALPath(index)) + return filepath.Join(db.ShadowWALDir(generation), FormatIndex(index)+".wal") } // CurrentShadowWALPath returns the path to the last shadow WAL in a generation. @@ -191,8 +193,8 @@ func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, e // Find highest wal index. for _, fi := range fis { - if v, err := ParseWALPath(fi.Name()); err != nil { - continue // invalid wal filename + if v, err := parseWALPath(fi.Name()); err != nil { + continue // invalid filename } else if v > index { index = v } @@ -584,7 +586,7 @@ func (db *DB) cleanWAL() error { return err } for _, fi := range fis { - if idx, err := ParseWALPath(fi.Name()); err != nil || idx >= min { + if idx, err := parseWALPath(fi.Name()); err != nil || idx >= min { continue } if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { @@ -928,13 +930,13 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) { // Parse index of current shadow WAL file. dir, base := filepath.Split(info.shadowWALPath) - index, err := ParseWALPath(base) + index, err := parseWALPath(base) if err != nil { return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) } // Start a new shadow WAL file with next index. - newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1)) + newShadowWALPath := filepath.Join(dir, formatWALPath(index+1)) newSize, err = db.initShadowWALFile(newShadowWALPath) if err != nil { return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) @@ -1298,13 +1300,13 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { } // Parse index of current shadow WAL file. - index, err := ParseWALPath(shadowWALPath) + index, err := parseWALPath(shadowWALPath) if err != nil { return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath) } // Start a new shadow WAL file with next index. - newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1)) + newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), formatWALPath(index+1)) if _, err := db.initShadowWALFile(newShadowWALPath); err != nil { return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } @@ -1481,6 +1483,28 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) { return h.Sum64(), pos, nil } +// parseWALPath returns the index for the WAL file. +// Returns an error if the path is not a valid WAL path. +func parseWALPath(s string) (index int, err error) { + s = filepath.Base(s) + + a := walPathRegex.FindStringSubmatch(s) + if a == nil { + return 0, fmt.Errorf("invalid wal path: %s", s) + } + + i64, _ := strconv.ParseUint(a[1], 16, 64) + return int(i64), nil +} + +// formatWALPath formats a WAL filename with a given index. +func formatWALPath(index int) string { + assert(index >= 0, "wal index must be non-negative") + return FormatIndex(index) + ".wal" +} + +var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.wal$`) + // DefaultRestoreParallelism is the default parallelism when downloading WAL files. const DefaultRestoreParallelism = 8 diff --git a/file/replica_client.go b/file/replica_client.go index 178797a..8d0da74 100644 --- a/file/replica_client.go +++ b/file/replica_client.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sort" + "strings" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/internal" @@ -84,7 +85,7 @@ func (c *ReplicaClient) SnapshotPath(generation string, index int) (string, erro if err != nil { return "", err } - return filepath.Join(dir, litestream.FormatSnapshotPath(index)), nil + return filepath.Join(dir, litestream.FormatIndex(index)+".snapshot.lz4"), nil } // WALDir returns the path to a generation's WAL directory @@ -102,7 +103,7 @@ func (c *ReplicaClient) WALSegmentPath(generation string, index int, offset int6 if err != nil { return "", err } - return filepath.Join(dir, litestream.FormatWALSegmentPath(index, offset)), nil + return filepath.Join(dir, litestream.FormatIndex(index), fmt.Sprintf("%08x.wal.lz4", offset)), nil } // Generations returns a list of available generation names. @@ -148,7 +149,7 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (litestream.SnapshotIterator, error) { dir, err := c.SnapshotsDir(generation) if err != nil { - return nil, fmt.Errorf("cannot determine snapshots path: %w", err) + return nil, err } f, err := os.Open(dir) @@ -168,7 +169,7 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites infos := make([]litestream.SnapshotInfo, 0, len(fis)) for _, fi := range fis { // Parse index from filename. - index, err := litestream.ParseSnapshotPath(fi.Name()) + index, err := internal.ParseSnapshotPath(filepath.Base(fi.Name())) if err != nil { continue } @@ -190,7 +191,7 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { filename, err := c.SnapshotPath(generation, index) if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) + return info, err } var fileInfo, dirInfo os.FileInfo @@ -243,7 +244,7 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { filename, err := c.SnapshotPath(generation, index) if err != nil { - return nil, fmt.Errorf("cannot determine snapshot path: %w", err) + return nil, err } return os.Open(filename) } @@ -264,7 +265,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) { dir, err := c.WALDir(generation) if err != nil { - return nil, fmt.Errorf("cannot determine wal path: %w", err) + return nil, err } f, err := os.Open(dir) @@ -281,33 +282,25 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit } // Iterate over every file and convert to metadata. - infos := make([]litestream.WALSegmentInfo, 0, len(fis)) + indexes := make([]int, 0, len(fis)) for _, fi := range fis { - // Parse index from filename. - index, offset, err := litestream.ParseWALSegmentPath(fi.Name()) - if err != nil { + index, err := litestream.ParseIndex(fi.Name()) + if err != nil || !fi.IsDir() { continue } - - infos = append(infos, litestream.WALSegmentInfo{ - Generation: generation, - Index: index, - Offset: offset, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), - }) + indexes = append(indexes, index) } - sort.Sort(litestream.WALSegmentInfoSlice(infos)) + sort.Ints(indexes) - return litestream.NewWALSegmentInfoSliceIterator(infos), nil + return newWALSegmentIterator(dir, generation, indexes), nil } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) + return info, err } var fileInfo, dirInfo os.FileInfo @@ -361,7 +354,7 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) if err != nil { - return nil, fmt.Errorf("cannot determine wal segment path: %w", err) + return nil, err } return os.Open(filename) } @@ -371,7 +364,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po for _, pos := range a { filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) if err != nil { - return fmt.Errorf("cannot determine wal segment path: %w", err) + return err } if err := os.Remove(filename); err != nil && !os.IsNotExist(err) { return err @@ -379,3 +372,97 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } return nil } + +type walSegmentIterator struct { + dir string + generation string + indexes []int + + infos []litestream.WALSegmentInfo + err error +} + +func newWALSegmentIterator(dir, generation string, indexes []int) *walSegmentIterator { + return &walSegmentIterator{ + dir: dir, + generation: generation, + indexes: indexes, + } +} + +func (itr *walSegmentIterator) Close() (err error) { + return itr.err +} + +func (itr *walSegmentIterator) Next() bool { + // Exit if an error has already occurred. + if itr.err != nil { + return false + } + + for { + // Move to the next segment in cache, if available. + if len(itr.infos) > 1 { + itr.infos = itr.infos[1:] + return true + } + itr.infos = itr.infos[:0] // otherwise clear infos + + // Move to the next index unless this is the first time initializing. + if itr.infos != nil && len(itr.indexes) > 0 { + itr.indexes = itr.indexes[1:] + } + + // If no indexes remain, stop iteration. + if len(itr.indexes) == 0 { + return false + } + + // Read segments into a cache for the current index. + index := itr.indexes[0] + f, err := os.Open(filepath.Join(itr.dir, litestream.FormatIndex(index))) + if err != nil { + itr.err = err + return false + } + defer f.Close() + + fis, err := f.Readdir(-1) + if err != nil { + itr.err = err + return false + } + for _, fi := range fis { + filename := filepath.Base(fi.Name()) + if fi.IsDir() { + continue + } + + offset, err := litestream.ParseOffset(strings.TrimSuffix(filename, ".wal.lz4")) + if err != nil { + continue + } + + itr.infos = append(itr.infos, litestream.WALSegmentInfo{ + Generation: itr.generation, + Index: index, + Offset: offset, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + }) + } + + if len(itr.infos) > 0 { + return true + } + } +} + +func (itr *walSegmentIterator) Err() error { return itr.err } + +func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo { + if len(itr.infos) == 0 { + return litestream.WALSegmentInfo{} + } + return itr.infos[0] +} diff --git a/file/replica_client_test.go b/file/replica_client_test.go index 94d2e44..bafeefd 100644 --- a/file/replica_client_test.go +++ b/file/replica_client_test.go @@ -118,7 +118,7 @@ func TestReplicaClient_WALSegmentPath(t *testing.T) { t.Run("OK", func(t *testing.T) { if got, err := file.NewReplicaClient("/foo").WALSegmentPath("0123456701234567", 1000, 1001); err != nil { t.Fatal(err) - } else if want := "/foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want { + } else if want := "/foo/generations/0123456701234567/wal/000003e8/000003e9.wal.lz4"; got != want { t.Fatalf("WALPath()=%v, want %v", got, want) } }) diff --git a/gcs/replica_client.go b/gcs/replica_client.go index 7b2b2c6..0a45b2b 100644 --- a/gcs/replica_client.go +++ b/gcs/replica_client.go @@ -68,7 +68,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { // Construct query to only pull generation directory names. query := &storage.Query{ Delimiter: "/", - Prefix: litestream.GenerationsPath(c.Path) + "/", + Prefix: path.Join(c.Path, "generations") + "/", } // Loop over results and only build list of generation-formatted names. @@ -96,16 +96,15 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error { if err := c.Init(ctx); err != nil { return err + } else if generation == "" { + return fmt.Errorf("generation required") } - dir, err := litestream.GenerationPath(c.Path, generation) - if err != nil { - return fmt.Errorf("cannot determine generation path: %w", err) - } + prefix := path.Join(c.Path, "generations", generation) + "/" // Iterate over every object in generation and delete it. internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() - for it := c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"}); ; { + for it := c.bkt.Objects(ctx, &storage.Query{Prefix: prefix}); ; { attrs, err := it.Next() if err == iterator.Done { break @@ -130,24 +129,22 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (litestream.SnapshotIterator, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - dir, err := litestream.SnapshotsPath(c.Path, generation) - if err != nil { - return nil, fmt.Errorf("cannot determine snapshots path: %w", err) - } - return newSnapshotIterator(generation, c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"})), nil + prefix := path.Join(c.Path, "generations", generation) + "/" + return newSnapshotIterator(generation, c.bkt.Objects(ctx, &storage.Query{Prefix: prefix})), nil } // WriteSnapshot writes LZ4 compressed data from rd to the object storage. func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { if err := c.Init(ctx); err != nil { return info, err + } else if generation == "" { + return info, fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") startTime := time.Now() w := c.bkt.Object(key).NewWriter(ctx) @@ -177,12 +174,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return nil, fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") r, err := c.bkt.Object(key).NewReader(ctx) if isNotExists(err) { @@ -201,12 +197,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { if err := c.Init(ctx); err != nil { return err + } else if generation == "" { + return fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index), ".snapshot.lz4") if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) { return fmt.Errorf("cannot delete snapshot %q: %w", key, err) @@ -220,24 +215,22 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - dir, err := litestream.WALPath(c.Path, generation) - if err != nil { - return nil, fmt.Errorf("cannot determine wal path: %w", err) - } - return newWALSegmentIterator(generation, c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"})), nil + prefix := path.Join(c.Path, "generations", generation, "wal") + "/" + return newWALSegmentIterator(generation, prefix, c.bkt.Objects(ctx, &storage.Query{Prefix: prefix})), nil } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { if err := c.Init(ctx); err != nil { return info, err + } else if pos.Generation == "" { + return info, fmt.Errorf("generation required") } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) - } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") startTime := time.Now() w := c.bkt.Object(key).NewWriter(ctx) @@ -267,12 +260,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if pos.Generation == "" { + return nil, fmt.Errorf("generation required") } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return nil, fmt.Errorf("cannot determine wal segment path: %w", err) - } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") r, err := c.bkt.Object(key).NewReader(ctx) if isNotExists(err) { @@ -294,11 +286,11 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } for _, pos := range a { - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return fmt.Errorf("cannot determine wal segment path: %w", err) + if pos.Generation == "" { + return fmt.Errorf("generation required") } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) { return fmt.Errorf("cannot delete wal segment %q: %w", key, err) } @@ -344,7 +336,7 @@ func (itr *snapshotIterator) Next() bool { } // Parse index, otherwise skip to the next object. - index, err := litestream.ParseSnapshotPath(path.Base(attrs.Name)) + index, err := internal.ParseSnapshotPath(path.Base(attrs.Name)) if err != nil { continue } @@ -366,15 +358,17 @@ func (itr *snapshotIterator) Snapshot() litestream.SnapshotInfo { return itr.inf type walSegmentIterator struct { generation string + prefix string it *storage.ObjectIterator info litestream.WALSegmentInfo err error } -func newWALSegmentIterator(generation string, it *storage.ObjectIterator) *walSegmentIterator { +func newWALSegmentIterator(generation, prefix string, it *storage.ObjectIterator) *walSegmentIterator { return &walSegmentIterator{ generation: generation, + prefix: prefix, it: it, } } @@ -400,7 +394,7 @@ func (itr *walSegmentIterator) Next() bool { } // Parse index & offset, otherwise skip to the next object. - index, offset, err := litestream.ParseWALSegmentPath(path.Base(attrs.Name)) + index, offset, err := internal.ParseWALSegmentPath(strings.TrimPrefix(attrs.Name, itr.prefix)) if err != nil { continue } diff --git a/internal/internal.go b/internal/internal.go index b22399c..be5027f 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -1,8 +1,11 @@ package internal import ( + "fmt" "io" "os" + "regexp" + "strconv" "syscall" "github.com/prometheus/client_golang/prometheus" @@ -127,6 +130,33 @@ func MkdirAll(path string, fi os.FileInfo) error { return nil } +// ParseSnapshotPath parses the index from a snapshot filename. Used by path-based replicas. +func ParseSnapshotPath(s string) (index int, err error) { + a := snapshotPathRegex.FindStringSubmatch(s) + if a == nil { + return 0, fmt.Errorf("invalid snapshot path") + } + + i64, _ := strconv.ParseUint(a[1], 16, 64) + return int(i64), nil +} + +var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.snapshot\.lz4$`) + +// ParseWALSegmentPath parses the index/offset from a segment filename. Used by path-based replicas. +func ParseWALSegmentPath(s string) (index int, offset int64, err error) { + a := walSegmentPathRegex.FindStringSubmatch(s) + if a == nil { + return 0, 0, fmt.Errorf("invalid wal segment path") + } + + i64, _ := strconv.ParseUint(a[1], 16, 64) + off64, _ := strconv.ParseUint(a[2], 16, 64) + return int(i64), int64(off64), nil +} + +var walSegmentPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\/([0-9a-f]{8})\.wal\.lz4$`) + // Shared replica metrics. var ( OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ diff --git a/internal/internal_test.go b/internal/internal_test.go new file mode 100644 index 0000000..a8eda5d --- /dev/null +++ b/internal/internal_test.go @@ -0,0 +1,61 @@ +package internal_test + +import ( + "fmt" + "reflect" + "testing" + + "github.com/benbjohnson/litestream/internal" +) + +func TestParseSnapshotPath(t *testing.T) { + for _, tt := range []struct { + s string + index int + err error + }{ + {"00bc614e.snapshot.lz4", 12345678, nil}, + {"xxxxxxxx.snapshot.lz4", 0, fmt.Errorf("invalid snapshot path")}, + {"00bc614.snapshot.lz4", 0, fmt.Errorf("invalid snapshot path")}, + {"00bc614e.snapshot.lz", 0, fmt.Errorf("invalid snapshot path")}, + {"00bc614e.snapshot", 0, fmt.Errorf("invalid snapshot path")}, + {"00bc614e", 0, fmt.Errorf("invalid snapshot path")}, + {"", 0, fmt.Errorf("invalid snapshot path")}, + } { + t.Run("", func(t *testing.T) { + index, err := internal.ParseSnapshotPath(tt.s) + if got, want := index, tt.index; got != want { + t.Errorf("index=%#v, want %#v", got, want) + } else if got, want := err, tt.err; !reflect.DeepEqual(got, want) { + t.Errorf("err=%#v, want %#v", got, want) + } + }) + } +} + +func TestParseWALSegmentPath(t *testing.T) { + for _, tt := range []struct { + s string + index int + offset int64 + err error + }{ + {"00bc614e/000003e8.wal.lz4", 12345678, 1000, nil}, + {"00000000/00000000.wal", 0, 0, fmt.Errorf("invalid wal segment path")}, + {"00000000/00000000", 0, 0, fmt.Errorf("invalid wal segment path")}, + {"00000000/", 0, 0, fmt.Errorf("invalid wal segment path")}, + {"00000000", 0, 0, fmt.Errorf("invalid wal segment path")}, + {"", 0, 0, fmt.Errorf("invalid wal segment path")}, + } { + t.Run("", func(t *testing.T) { + index, offset, err := internal.ParseWALSegmentPath(tt.s) + if got, want := index, tt.index; got != want { + t.Errorf("index=%#v, want %#v", got, want) + } else if got, want := offset, tt.offset; got != want { + t.Errorf("offset=%#v, want %#v", got, want) + } else if got, want := err, tt.err; !reflect.DeepEqual(got, want) { + t.Errorf("err=%#v, want %#v", got, want) + } + }) + } +} diff --git a/litestream.go b/litestream.go index f31985b..bd0477c 100644 --- a/litestream.go +++ b/litestream.go @@ -7,9 +7,7 @@ import ( "fmt" "io" "os" - "path" "path/filepath" - "regexp" "strconv" "strings" "time" @@ -384,134 +382,34 @@ func IsGenerationName(s string) bool { return true } -// GenerationsPath returns the path to a generation root directory. -func GenerationsPath(root string) string { - return path.Join(root, "generations") +// FormatIndex formats an index as an 8-character hex value. +func FormatIndex(index int) string { + return fmt.Sprintf("%08x", index) } -// GenerationPath returns the path to a generation's root directory. -func GenerationPath(root, generation string) (string, error) { - dir := GenerationsPath(root) - if generation == "" { - return "", fmt.Errorf("generation required") - } - return path.Join(dir, generation), nil -} - -// SnapshotsPath returns the path to a generation's snapshot directory. -func SnapshotsPath(root, generation string) (string, error) { - dir, err := GenerationPath(root, generation) +// ParseIndex parses a hex-formatted index into an integer. +func ParseIndex(s string) (int, error) { + v, err := strconv.ParseUint(s, 16, 32) if err != nil { - return "", err + return -1, fmt.Errorf("cannot parse index: %q", s) } - return path.Join(dir, "snapshots"), nil + return int(v), nil } -// SnapshotPath returns the path to an uncompressed snapshot file. -func SnapshotPath(root, generation string, index int) (string, error) { - dir, err := SnapshotsPath(root, generation) +// FormatOffset formats an offset as an 8-character hex value. +func FormatOffset(offset int64) string { + return fmt.Sprintf("%08x", offset) +} + +// ParseOffset parses a hex-formatted offset into an integer. +func ParseOffset(s string) (int64, error) { + v, err := strconv.ParseInt(s, 16, 32) if err != nil { - return "", err + return -1, fmt.Errorf("cannot parse index: %q", s) } - return path.Join(dir, FormatSnapshotPath(index)), nil + return v, nil } -// WALPath returns the path to a generation's WAL directory -func WALPath(root, generation string) (string, error) { - dir, err := GenerationPath(root, generation) - if err != nil { - return "", err - } - return path.Join(dir, "wal"), nil -} - -// WALSegmentPath returns the path to a WAL segment file. -func WALSegmentPath(root, generation string, index int, offset int64) (string, error) { - dir, err := WALPath(root, generation) - if err != nil { - return "", err - } - return path.Join(dir, FormatWALSegmentPath(index, offset)), nil -} - -// IsSnapshotPath returns true if s is a path to a snapshot file. -func IsSnapshotPath(s string) bool { - return snapshotPathRegex.MatchString(s) -} - -// ParseSnapshotPath returns the index for the snapshot. -// Returns an error if the path is not a valid snapshot path. -func ParseSnapshotPath(s string) (index int, err error) { - s = filepath.Base(s) - - a := snapshotPathRegex.FindStringSubmatch(s) - if a == nil { - return 0, fmt.Errorf("invalid snapshot path: %s", s) - } - - i64, _ := strconv.ParseUint(a[1], 16, 64) - return int(i64), nil -} - -// FormatSnapshotPath formats a snapshot filename with a given index. -func FormatSnapshotPath(index int) string { - assert(index >= 0, "snapshot index must be non-negative") - return fmt.Sprintf("%08x%s", index, SnapshotExt) -} - -var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.snapshot\.lz4$`) - -// IsWALPath returns true if s is a path to a WAL file. -func IsWALPath(s string) bool { - return walPathRegex.MatchString(s) -} - -// ParseWALPath returns the index for the WAL file. -// Returns an error if the path is not a valid WAL path. -func ParseWALPath(s string) (index int, err error) { - s = filepath.Base(s) - - a := walPathRegex.FindStringSubmatch(s) - if a == nil { - return 0, fmt.Errorf("invalid wal path: %s", s) - } - - i64, _ := strconv.ParseUint(a[1], 16, 64) - return int(i64), nil -} - -// FormatWALPath formats a WAL filename with a given index. -func FormatWALPath(index int) string { - assert(index >= 0, "wal index must be non-negative") - return fmt.Sprintf("%08x%s", index, WALExt) -} - -var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.wal$`) - -// ParseWALSegmentPath returns the index & offset for the WAL segment file. -// Returns an error if the path is not a valid wal segment path. -func ParseWALSegmentPath(s string) (index int, offset int64, err error) { - s = filepath.Base(s) - - a := walSegmentPathRegex.FindStringSubmatch(s) - if a == nil { - return 0, 0, fmt.Errorf("invalid wal segment path: %s", s) - } - - i64, _ := strconv.ParseUint(a[1], 16, 64) - off64, _ := strconv.ParseUint(a[2], 16, 64) - return int(i64), int64(off64), nil -} - -// FormatWALSegmentPath formats a WAL segment filename with a given index & offset. -func FormatWALSegmentPath(index int, offset int64) string { - assert(index >= 0, "wal index must be non-negative") - assert(offset >= 0, "wal offset must be non-negative") - return fmt.Sprintf("%08x_%08x%s", index, offset, WALSegmentExt) -} - -var walSegmentPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))\.wal\.lz4$`) - // isHexChar returns true if ch is a lowercase hex character. func isHexChar(ch rune) bool { return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') diff --git a/litestream_test.go b/litestream_test.go index 0f1bb85..a03a748 100644 --- a/litestream_test.go +++ b/litestream_test.go @@ -40,94 +40,6 @@ func TestChecksum(t *testing.T) { }) } -func TestGenerationsPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - if got, want := litestream.GenerationsPath("foo"), "foo/generations"; got != want { - t.Fatalf("GenerationsPath()=%v, want %v", got, want) - } - }) - t.Run("NoPath", func(t *testing.T) { - if got, want := litestream.GenerationsPath(""), "generations"; got != want { - t.Fatalf("GenerationsPath()=%v, want %v", got, want) - } - }) -} - -func TestGenerationPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - if got, err := litestream.GenerationPath("foo", "0123456701234567"); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567"; got != want { - t.Fatalf("GenerationPath()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := litestream.GenerationPath("foo", ""); err == nil || err.Error() != `generation required` { - t.Fatalf("expected error: %v", err) - } - }) -} - -func TestSnapshotsPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - if got, err := litestream.SnapshotsPath("foo", "0123456701234567"); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/snapshots"; got != want { - t.Fatalf("SnapshotsPath()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := litestream.SnapshotsPath("foo", ""); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - -func TestSnapshotPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - if got, err := litestream.SnapshotPath("foo", "0123456701234567", 1000); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/snapshots/000003e8.snapshot.lz4"; got != want { - t.Fatalf("SnapshotPath()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := litestream.SnapshotPath("foo", "", 1000); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - -func TestWALPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - if got, err := litestream.WALPath("foo", "0123456701234567"); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/wal"; got != want { - t.Fatalf("WALPath()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := litestream.WALPath("foo", ""); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - -func TestWALSegmentPath(t *testing.T) { - t.Run("OK", func(t *testing.T) { - if got, err := litestream.WALSegmentPath("foo", "0123456701234567", 1000, 1001); err != nil { - t.Fatal(err) - } else if want := "foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want { - t.Fatalf("WALPath()=%v, want %v", got, want) - } - }) - t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := litestream.WALSegmentPath("foo", "", 1000, 0); err == nil || err.Error() != `generation required` { - t.Fatalf("unexpected error: %v", err) - } - }) -} - func MustDecodeHexString(s string) []byte { b, err := hex.DecodeString(s) if err != nil { diff --git a/replica_client_test.go b/replica_client_test.go index 69f9746..ec2d841 100644 --- a/replica_client_test.go +++ b/replica_client_test.go @@ -177,7 +177,7 @@ func TestReplicaClient_Snapshots(t *testing.T) { if err == nil { err = itr.Close() } - if err == nil || err.Error() != `cannot determine snapshots path: generation required` { + if err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -204,7 +204,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteSnapshot(context.Background(), "", 0, nil); err == nil || err.Error() != `cannot determine snapshot path: generation required` { + if _, err := c.WriteSnapshot(context.Background(), "", 0, nil); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -242,13 +242,13 @@ func TestReplicaClient_SnapshotReader(t *testing.T) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.SnapshotReader(context.Background(), "", 1); err == nil || err.Error() != `cannot determine snapshot path: generation required` { + if _, err := c.SnapshotReader(context.Background(), "", 1); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) } -func TestReplicaClient_WALs(t *testing.T) { +func TestReplicaClient_WALSegments(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() @@ -362,7 +362,7 @@ func TestReplicaClient_WALs(t *testing.T) { if err == nil { err = itr.Close() } - if err == nil || err.Error() != `cannot determine wal path: generation required` { + if err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -389,13 +389,13 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "", Index: 0, Offset: 0}, nil); err == nil || err.Error() != `cannot determine wal segment path: generation required` { + if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "", Index: 0, Offset: 0}, nil); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) } -func TestReplicaClient_WALReader(t *testing.T) { +func TestReplicaClient_WALSegmentReader(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() @@ -451,7 +451,7 @@ func TestReplicaClient_DeleteWALSegments(t *testing.T) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if err := c.DeleteWALSegments(context.Background(), []litestream.Pos{{}}); err == nil || err.Error() != `cannot determine wal segment path: generation required` { + if err := c.DeleteWALSegments(context.Background(), []litestream.Pos{{}}); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) diff --git a/s3/replica_client.go b/s3/replica_client.go index b68628a..a739a5f 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -10,6 +10,7 @@ import ( "os" "path" "regexp" + "strings" "sync" "time" @@ -154,7 +155,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { var generations []string if err := c.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(c.Bucket), - Prefix: aws.String(litestream.GenerationsPath(c.Path) + "/"), + Prefix: aws.String(path.Join(c.Path, "generations") + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() @@ -178,18 +179,15 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) { func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error { if err := c.Init(ctx); err != nil { return err - } - - dir, err := litestream.GenerationPath(c.Path, generation) - if err != nil { - return fmt.Errorf("cannot determine generation path: %w", err) + } else if generation == "" { + return fmt.Errorf("generation required") } // Collect all files for the generation. var objIDs []*s3.ObjectIdentifier if err := c.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(c.Bucket), - Prefix: aws.String(dir), + Prefix: aws.String(path.Join(c.Path, "generations", generation)), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() @@ -236,12 +234,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { if err := c.Init(ctx); err != nil { return info, err + } else if generation == "" { + return info, fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") startTime := time.Now() rc := internal.NewReadCounter(rd) @@ -270,12 +267,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return nil, fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") out, err := c.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ Bucket: aws.String(c.Bucket), @@ -296,12 +292,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { if err := c.Init(ctx); err != nil { return err + } else if generation == "" { + return fmt.Errorf("generation required") } - key, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return fmt.Errorf("cannot determine snapshot path: %w", err) - } + key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(c.Bucket), @@ -326,12 +321,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { if err := c.Init(ctx); err != nil { return info, err + } else if pos.Generation == "" { + return info, fmt.Errorf("generation required") } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) - } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") startTime := time.Now() rc := internal.NewReadCounter(rd) @@ -360,12 +354,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err + } else if pos.Generation == "" { + return nil, fmt.Errorf("generation required") } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return nil, fmt.Errorf("cannot determine wal segment path: %w", err) - } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") out, err := c.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ Bucket: aws.String(c.Bucket), @@ -397,10 +390,10 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po // Generate a batch of object IDs for deleting the WAL segments. for i, pos := range a[:n] { - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return fmt.Errorf("cannot determine wal segment path: %w", err) + if pos.Generation == "" { + return fmt.Errorf("generation required") } + key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") objIDs[i] = &s3.ObjectIdentifier{Key: &key} } @@ -498,11 +491,12 @@ func newSnapshotIterator(ctx context.Context, client *ReplicaClient, generation func (itr *snapshotIterator) fetch() error { defer close(itr.ch) - dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation) - if err != nil { - return fmt.Errorf("cannot determine snapshots path: %w", err) + if itr.generation == "" { + return fmt.Errorf("generation required") } + dir := path.Join(itr.client.Path, "generations", itr.generation, "snapshots") + return itr.client.s3.ListObjectsPagesWithContext(itr.ctx, &s3.ListObjectsInput{ Bucket: aws.String(itr.client.Bucket), Prefix: aws.String(dir + "/"), @@ -511,8 +505,7 @@ func (itr *snapshotIterator) fetch() error { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { - key := path.Base(*obj.Key) - index, err := litestream.ParseSnapshotPath(key) + index, err := internal.ParseSnapshotPath(path.Base(*obj.Key)) if err != nil { continue } @@ -601,21 +594,20 @@ func newWALSegmentIterator(ctx context.Context, client *ReplicaClient, generatio func (itr *walSegmentIterator) fetch() error { defer close(itr.ch) - dir, err := litestream.WALPath(itr.client.Path, itr.generation) - if err != nil { - return fmt.Errorf("cannot determine wal path: %w", err) + if itr.generation == "" { + return fmt.Errorf("generation required") } + prefix := path.Join(itr.client.Path, "generations", itr.generation, "wal") + "/" + return itr.client.s3.ListObjectsPagesWithContext(itr.ctx, &s3.ListObjectsInput{ - Bucket: aws.String(itr.client.Bucket), - Prefix: aws.String(dir + "/"), - Delimiter: aws.String("/"), + Bucket: aws.String(itr.client.Bucket), + Prefix: aws.String(prefix), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { - key := path.Base(*obj.Key) - index, offset, err := litestream.ParseWALSegmentPath(key) + index, offset, err := internal.ParseWALSegmentPath(strings.TrimPrefix(*obj.Key, prefix)) if err != nil { continue } diff --git a/sftp/replica_client.go b/sftp/replica_client.go index 6b082b4..30d8fa8 100644 --- a/sftp/replica_client.go +++ b/sftp/replica_client.go @@ -9,6 +9,7 @@ import ( "os" "path" "sort" + "strings" "sync" "time" @@ -121,7 +122,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) (_ []string, err error) return nil, err } - fis, err := sftpClient.ReadDir(litestream.GenerationsPath(c.Path)) + fis, err := sftpClient.ReadDir(path.Join(c.Path, "generations")) if os.IsNotExist(err) { return nil, nil } else if err != nil { @@ -153,12 +154,11 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) sftpClient, err := c.Init(ctx) if err != nil { return err + } else if generation == "" { + return fmt.Errorf("generation required") } - dir, err := litestream.GenerationPath(c.Path, generation) - if err != nil { - return fmt.Errorf("cannot determine generation path: %w", err) - } + dir := path.Join(c.Path, "generations", generation) var dirs []string walker := sftpClient.Walk(dir) @@ -198,12 +198,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit sftpClient, err := c.Init(ctx) if err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - dir, err := litestream.SnapshotsPath(c.Path, generation) - if err != nil { - return nil, fmt.Errorf("cannot determine snapshots path: %w", err) - } + dir := path.Join(c.Path, "generations", generation, "snapshots") fis, err := sftpClient.ReadDir(dir) if os.IsNotExist(err) { @@ -216,7 +215,7 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit infos := make([]litestream.SnapshotInfo, 0, len(fis)) for _, fi := range fis { // Parse index from filename. - index, err := litestream.ParseSnapshotPath(path.Base(fi.Name())) + index, err := internal.ParseSnapshotPath(path.Base(fi.Name())) if err != nil { continue } @@ -241,12 +240,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in sftpClient, err := c.Init(ctx) if err != nil { return info, err + } else if generation == "" { + return info, fmt.Errorf("generation required") } - filename, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) - } + filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") startTime := time.Now() if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { @@ -286,12 +284,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i sftpClient, err := c.Init(ctx) if err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - filename, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return nil, fmt.Errorf("cannot determine snapshot path: %w", err) - } + filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") f, err := sftpClient.Open(filename) if err != nil { @@ -310,12 +307,11 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i sftpClient, err := c.Init(ctx) if err != nil { return err + } else if generation == "" { + return fmt.Errorf("generation required") } - filename, err := litestream.SnapshotPath(c.Path, generation, index) - if err != nil { - return fmt.Errorf("cannot determine snapshot path: %w", err) - } + filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4") if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { return fmt.Errorf("cannot delete snapshot %q: %w", filename, err) @@ -332,12 +328,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l sftpClient, err := c.Init(ctx) if err != nil { return nil, err + } else if generation == "" { + return nil, fmt.Errorf("generation required") } - dir, err := litestream.WALPath(c.Path, generation) - if err != nil { - return nil, fmt.Errorf("cannot determine wal path: %w", err) - } + dir := path.Join(c.Path, "generations", generation, "wal") fis, err := sftpClient.ReadDir(dir) if os.IsNotExist(err) { @@ -347,25 +342,18 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l } // Iterate over every file and convert to metadata. - infos := make([]litestream.WALSegmentInfo, 0, len(fis)) + indexes := make([]int, 0, len(fis)) for _, fi := range fis { - index, offset, err := litestream.ParseWALSegmentPath(path.Base(fi.Name())) - if err != nil { + index, err := litestream.ParseIndex(fi.Name()) + if err != nil || !fi.IsDir() { continue } - - infos = append(infos, litestream.WALSegmentInfo{ - Generation: generation, - Index: index, - Offset: offset, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), - }) + indexes = append(indexes, index) } - sort.Sort(litestream.WALSegmentInfoSlice(infos)) + sort.Ints(indexes) - return litestream.NewWALSegmentInfoSliceIterator(infos), nil + return newWALSegmentIterator(ctx, c, dir, generation, indexes), nil } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. @@ -375,12 +363,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, sftpClient, err := c.Init(ctx) if err != nil { return info, err + } else if pos.Generation == "" { + return info, fmt.Errorf("generation required") } - filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) - } + filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") startTime := time.Now() if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { @@ -420,12 +407,11 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos sftpClient, err := c.Init(ctx) if err != nil { return nil, err + } else if pos.Generation == "" { + return nil, fmt.Errorf("generation required") } - filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return nil, fmt.Errorf("cannot determine wal segment path: %w", err) - } + filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") f, err := sftpClient.Open(filename) if err != nil { @@ -447,11 +433,12 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } for _, pos := range a { - filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) - if err != nil { - return fmt.Errorf("cannot determine wal segment path: %w", err) + if pos.Generation == "" { + return fmt.Errorf("generation required") } + filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4") + if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { return fmt.Errorf("cannot delete wal segment %q: %w", filename, err) } @@ -470,7 +457,7 @@ func (c *ReplicaClient) Cleanup(ctx context.Context) (err error) { return err } - if err := sftpClient.RemoveDirectory(litestream.GenerationsPath(c.Path)); err != nil && !os.IsNotExist(err) { + if err := sftpClient.RemoveDirectory(path.Join(c.Path, "generations")); err != nil && !os.IsNotExist(err) { return fmt.Errorf("cannot delete generations path: %w", err) } else if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) { return fmt.Errorf("cannot delete path: %w", err) @@ -493,3 +480,101 @@ func (c *ReplicaClient) resetOnConnError(err error) { c.sshClient = nil } } + +type walSegmentIterator struct { + ctx context.Context + client *ReplicaClient + dir string + generation string + indexes []int + + infos []litestream.WALSegmentInfo + err error +} + +func newWALSegmentIterator(ctx context.Context, client *ReplicaClient, dir, generation string, indexes []int) *walSegmentIterator { + return &walSegmentIterator{ + ctx: ctx, + client: client, + dir: dir, + generation: generation, + indexes: indexes, + } +} + +func (itr *walSegmentIterator) Close() (err error) { + return itr.err +} + +func (itr *walSegmentIterator) Next() bool { + sftpClient, err := itr.client.Init(itr.ctx) + if err != nil { + itr.err = err + return false + } + + // Exit if an error has already occurred. + if itr.err != nil { + return false + } + + for { + // Move to the next segment in cache, if available. + if len(itr.infos) > 1 { + itr.infos = itr.infos[1:] + return true + } + itr.infos = itr.infos[:0] // otherwise clear infos + + // Move to the next index unless this is the first time initializing. + if itr.infos != nil && len(itr.indexes) > 0 { + itr.indexes = itr.indexes[1:] + } + + // If no indexes remain, stop iteration. + if len(itr.indexes) == 0 { + return false + } + + // Read segments into a cache for the current index. + index := itr.indexes[0] + fis, err := sftpClient.ReadDir(path.Join(itr.dir, litestream.FormatIndex(index))) + if err != nil { + itr.err = err + return false + } + + for _, fi := range fis { + filename := path.Base(fi.Name()) + if fi.IsDir() { + continue + } + + offset, err := litestream.ParseOffset(strings.TrimSuffix(filename, ".wal.lz4")) + if err != nil { + continue + } + + itr.infos = append(itr.infos, litestream.WALSegmentInfo{ + Generation: itr.generation, + Index: index, + Offset: offset, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + }) + } + + if len(itr.infos) > 0 { + return true + } + } +} + +func (itr *walSegmentIterator) Err() error { return itr.err } + +func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo { + if len(itr.infos) == 0 { + return litestream.WALSegmentInfo{} + } + return itr.infos[0] +}