Group replica wal segments by index

This commit changes the replica path format to group segments within
a single index in the same directory. This is to eventually add the
ability to seek to a record on file-based systems without having
to iterate over the records. The DB shadow WAL will also be changed
to this same format to support live replicas.
This commit is contained in:
Ben Johnson
2021-06-12 10:20:22 -06:00
parent 55c17b9d8e
commit d2cef2f16b
12 changed files with 497 additions and 419 deletions

View File

@@ -102,7 +102,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{ resp, err := c.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
Prefix: litestream.GenerationsPath(c.Path) + "/", Prefix: path.Join(c.Path, "generations") + "/",
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -125,18 +125,17 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error { func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return err return err
} else if generation == "" {
return fmt.Errorf("generation required")
} }
dir, err := litestream.GenerationPath(c.Path, generation) prefix := path.Join(c.Path, "generations", generation) + "/"
if err != nil {
return fmt.Errorf("cannot determine generation path: %w", err)
}
var marker azblob.Marker var marker azblob.Marker
for marker.NotDone() { for marker.NotDone() {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) resp, err := c.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil { if err != nil {
return err return err
} }
@@ -171,12 +170,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return info, err return info, err
} else if generation == "" {
return info, fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
}
startTime := time.Now() startTime := time.Now()
rc := internal.NewReadCounter(rd) rc := internal.NewReadCounter(rd)
@@ -206,12 +204,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
}
blobURL := c.containerURL.NewBlobURL(key) blobURL := c.containerURL.NewBlobURL(key)
resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
@@ -231,12 +228,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return err return err
} else if generation == "" {
return fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
@@ -261,12 +257,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return info, err return info, err
} else if pos.Generation == "" {
return info, fmt.Errorf("generation required")
} }
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
}
startTime := time.Now() startTime := time.Now()
rc := internal.NewReadCounter(rd) rc := internal.NewReadCounter(rd)
@@ -296,12 +291,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if pos.Generation == "" {
return nil, fmt.Errorf("generation required")
} }
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
}
blobURL := c.containerURL.NewBlobURL(key) blobURL := c.containerURL.NewBlobURL(key)
resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) resp, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
@@ -324,11 +318,12 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
} }
for _, pos := range a { for _, pos := range a {
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) if pos.Generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine wal segment path: %w", err)
} }
key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
blobURL := c.containerURL.NewBlobURL(key) blobURL := c.containerURL.NewBlobURL(key)
@@ -372,24 +367,24 @@ func newSnapshotIterator(ctx context.Context, generation string, client *Replica
func (itr *snapshotIterator) fetch() error { func (itr *snapshotIterator) fetch() error {
defer close(itr.ch) defer close(itr.ch)
dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation) if itr.generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine snapshots path: %w", err)
} }
prefix := path.Join(itr.client.Path, "generations", itr.generation) + "/"
var marker azblob.Marker var marker azblob.Marker
for marker.NotDone() { for marker.NotDone() {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil { if err != nil {
return err return err
} }
marker = resp.NextMarker marker = resp.NextMarker
for _, item := range resp.Segment.BlobItems { for _, item := range resp.Segment.BlobItems {
key := path.Base(item.Name) index, err := internal.ParseSnapshotPath(path.Base(item.Name))
index, err := litestream.ParseSnapshotPath(key)
if err != nil { if err != nil {
continue continue
} }
@@ -478,24 +473,24 @@ func newWALSegmentIterator(ctx context.Context, generation string, client *Repli
func (itr *walSegmentIterator) fetch() error { func (itr *walSegmentIterator) fetch() error {
defer close(itr.ch) defer close(itr.ch)
dir, err := litestream.WALPath(itr.client.Path, itr.generation) if itr.generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine wal path: %w", err)
} }
prefix := path.Join(itr.client.Path, "generations", itr.generation, "wal")
var marker azblob.Marker var marker azblob.Marker
for marker.NotDone() { for marker.NotDone() {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"}) resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil { if err != nil {
return err return err
} }
marker = resp.NextMarker marker = resp.NextMarker
for _, item := range resp.Segment.BlobItems { for _, item := range resp.Segment.BlobItems {
key := path.Base(item.Name) key := strings.TrimPrefix(item.Name, prefix+"/")
index, offset, err := litestream.ParseWALSegmentPath(key) index, offset, err := internal.ParseWALSegmentPath(key)
if err != nil { if err != nil {
continue continue
} }

40
db.go
View File

@@ -16,6 +16,8 @@ import (
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
"regexp"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -168,7 +170,7 @@ func (db *DB) ShadowWALDir(generation string) string {
// Panics if generation is blank or index is negative. // Panics if generation is blank or index is negative.
func (db *DB) ShadowWALPath(generation string, index int) string { func (db *DB) ShadowWALPath(generation string, index int) string {
assert(index >= 0, "shadow wal index cannot be negative") assert(index >= 0, "shadow wal index cannot be negative")
return filepath.Join(db.ShadowWALDir(generation), FormatWALPath(index)) return filepath.Join(db.ShadowWALDir(generation), FormatIndex(index)+".wal")
} }
// CurrentShadowWALPath returns the path to the last shadow WAL in a generation. // CurrentShadowWALPath returns the path to the last shadow WAL in a generation.
@@ -191,8 +193,8 @@ func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, e
// Find highest wal index. // Find highest wal index.
for _, fi := range fis { for _, fi := range fis {
if v, err := ParseWALPath(fi.Name()); err != nil { if v, err := parseWALPath(fi.Name()); err != nil {
continue // invalid wal filename continue // invalid filename
} else if v > index { } else if v > index {
index = v index = v
} }
@@ -584,7 +586,7 @@ func (db *DB) cleanWAL() error {
return err return err
} }
for _, fi := range fis { for _, fi := range fis {
if idx, err := ParseWALPath(fi.Name()); err != nil || idx >= min { if idx, err := parseWALPath(fi.Name()); err != nil || idx >= min {
continue continue
} }
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
@@ -928,13 +930,13 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
// Parse index of current shadow WAL file. // Parse index of current shadow WAL file.
dir, base := filepath.Split(info.shadowWALPath) dir, base := filepath.Split(info.shadowWALPath)
index, err := ParseWALPath(base) index, err := parseWALPath(base)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
} }
// Start a new shadow WAL file with next index. // Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1)) newShadowWALPath := filepath.Join(dir, formatWALPath(index+1))
newSize, err = db.initShadowWALFile(newShadowWALPath) newSize, err = db.initShadowWALFile(newShadowWALPath)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
@@ -1298,13 +1300,13 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
} }
// Parse index of current shadow WAL file. // Parse index of current shadow WAL file.
index, err := ParseWALPath(shadowWALPath) index, err := parseWALPath(shadowWALPath)
if err != nil { if err != nil {
return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath) return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath)
} }
// Start a new shadow WAL file with next index. // Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1)) newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), formatWALPath(index+1))
if _, err := db.initShadowWALFile(newShadowWALPath); err != nil { if _, err := db.initShadowWALFile(newShadowWALPath); err != nil {
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
} }
@@ -1481,6 +1483,28 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
return h.Sum64(), pos, nil return h.Sum64(), pos, nil
} }
// parseWALPath returns the index for the WAL file.
// Returns an error if the path is not a valid WAL path.
func parseWALPath(s string) (index int, err error) {
s = filepath.Base(s)
a := walPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, fmt.Errorf("invalid wal path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), nil
}
// formatWALPath formats a WAL filename with a given index.
func formatWALPath(index int) string {
assert(index >= 0, "wal index must be non-negative")
return FormatIndex(index) + ".wal"
}
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.wal$`)
// DefaultRestoreParallelism is the default parallelism when downloading WAL files. // DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const DefaultRestoreParallelism = 8 const DefaultRestoreParallelism = 8

View File

@@ -8,6 +8,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal" "github.com/benbjohnson/litestream/internal"
@@ -84,7 +85,7 @@ func (c *ReplicaClient) SnapshotPath(generation string, index int) (string, erro
if err != nil { if err != nil {
return "", err return "", err
} }
return filepath.Join(dir, litestream.FormatSnapshotPath(index)), nil return filepath.Join(dir, litestream.FormatIndex(index)+".snapshot.lz4"), nil
} }
// WALDir returns the path to a generation's WAL directory // WALDir returns the path to a generation's WAL directory
@@ -102,7 +103,7 @@ func (c *ReplicaClient) WALSegmentPath(generation string, index int, offset int6
if err != nil { if err != nil {
return "", err return "", err
} }
return filepath.Join(dir, litestream.FormatWALSegmentPath(index, offset)), nil return filepath.Join(dir, litestream.FormatIndex(index), fmt.Sprintf("%08x.wal.lz4", offset)), nil
} }
// Generations returns a list of available generation names. // Generations returns a list of available generation names.
@@ -168,7 +169,7 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
infos := make([]litestream.SnapshotInfo, 0, len(fis)) infos := make([]litestream.SnapshotInfo, 0, len(fis))
for _, fi := range fis { for _, fi := range fis {
// Parse index from filename. // Parse index from filename.
index, err := litestream.ParseSnapshotPath(fi.Name()) index, err := internal.ParseSnapshotPath(filepath.Base(fi.Name()))
if err != nil { if err != nil {
continue continue
} }
@@ -281,26 +282,18 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
} }
// Iterate over every file and convert to metadata. // Iterate over every file and convert to metadata.
infos := make([]litestream.WALSegmentInfo, 0, len(fis)) indexes := make([]int, 0, len(fis))
for _, fi := range fis { for _, fi := range fis {
// Parse index from filename. index, err := litestream.ParseIndex(fi.Name())
index, offset, err := litestream.ParseWALSegmentPath(fi.Name()) if err != nil || !fi.IsDir() {
if err != nil {
continue continue
} }
indexes = append(indexes, index)
infos = append(infos, litestream.WALSegmentInfo{
Generation: generation,
Index: index,
Offset: offset,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
} }
sort.Sort(litestream.WALSegmentInfoSlice(infos)) sort.Ints(indexes)
return litestream.NewWALSegmentInfoSliceIterator(infos), nil return newWALSegmentIterator(dir, generation, indexes), nil
} }
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk. // WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
@@ -379,3 +372,97 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
} }
return nil return nil
} }
type walSegmentIterator struct {
dir string
generation string
indexes []int
infos []litestream.WALSegmentInfo
err error
}
func newWALSegmentIterator(dir, generation string, indexes []int) *walSegmentIterator {
return &walSegmentIterator{
dir: dir,
generation: generation,
indexes: indexes,
}
}
func (itr *walSegmentIterator) Close() (err error) {
return itr.err
}
func (itr *walSegmentIterator) Next() bool {
// Exit if an error has already occurred.
if itr.err != nil {
return false
}
for {
// Move to the next segment in cache, if available.
if len(itr.infos) > 1 {
itr.infos = itr.infos[1:]
return true
}
itr.infos = itr.infos[:0] // otherwise clear infos
// Move to the next index unless this is the first time initializing.
if itr.infos != nil && len(itr.indexes) > 0 {
itr.indexes = itr.indexes[1:]
}
// If no indexes remain, stop iteration.
if len(itr.indexes) == 0 {
return false
}
// Read segments into a cache for the current index.
index := itr.indexes[0]
f, err := os.Open(filepath.Join(itr.dir, litestream.FormatIndex(index)))
if err != nil {
itr.err = err
return false
}
defer f.Close()
fis, err := f.Readdir(-1)
if err != nil {
itr.err = err
return false
}
for _, fi := range fis {
filename := filepath.Base(fi.Name())
if fi.IsDir() {
continue
}
offset, err := litestream.ParseOffset(strings.TrimSuffix(filename, ".wal.lz4"))
if err != nil {
continue
}
itr.infos = append(itr.infos, litestream.WALSegmentInfo{
Generation: itr.generation,
Index: index,
Offset: offset,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
}
if len(itr.infos) > 0 {
return true
}
}
}
func (itr *walSegmentIterator) Err() error { return itr.err }
func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo {
if len(itr.infos) == 0 {
return litestream.WALSegmentInfo{}
}
return itr.infos[0]
}

View File

@@ -118,7 +118,7 @@ func TestReplicaClient_WALSegmentPath(t *testing.T) {
t.Run("OK", func(t *testing.T) { t.Run("OK", func(t *testing.T) {
if got, err := file.NewReplicaClient("/foo").WALSegmentPath("0123456701234567", 1000, 1001); err != nil { if got, err := file.NewReplicaClient("/foo").WALSegmentPath("0123456701234567", 1000, 1001); err != nil {
t.Fatal(err) t.Fatal(err)
} else if want := "/foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want { } else if want := "/foo/generations/0123456701234567/wal/000003e8/000003e9.wal.lz4"; got != want {
t.Fatalf("WALPath()=%v, want %v", got, want) t.Fatalf("WALPath()=%v, want %v", got, want)
} }
}) })

View File

@@ -68,7 +68,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
// Construct query to only pull generation directory names. // Construct query to only pull generation directory names.
query := &storage.Query{ query := &storage.Query{
Delimiter: "/", Delimiter: "/",
Prefix: litestream.GenerationsPath(c.Path) + "/", Prefix: path.Join(c.Path, "generations") + "/",
} }
// Loop over results and only build list of generation-formatted names. // Loop over results and only build list of generation-formatted names.
@@ -96,16 +96,15 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error { func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return err return err
} else if generation == "" {
return fmt.Errorf("generation required")
} }
dir, err := litestream.GenerationPath(c.Path, generation) prefix := path.Join(c.Path, "generations", generation) + "/"
if err != nil {
return fmt.Errorf("cannot determine generation path: %w", err)
}
// Iterate over every object in generation and delete it. // Iterate over every object in generation and delete it.
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
for it := c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"}); ; { for it := c.bkt.Objects(ctx, &storage.Query{Prefix: prefix}); ; {
attrs, err := it.Next() attrs, err := it.Next()
if err == iterator.Done { if err == iterator.Done {
break break
@@ -130,24 +129,22 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (litestream.SnapshotIterator, error) { func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
dir, err := litestream.SnapshotsPath(c.Path, generation) prefix := path.Join(c.Path, "generations", generation) + "/"
if err != nil { return newSnapshotIterator(generation, c.bkt.Objects(ctx, &storage.Query{Prefix: prefix})), nil
return nil, fmt.Errorf("cannot determine snapshots path: %w", err)
}
return newSnapshotIterator(generation, c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"})), nil
} }
// WriteSnapshot writes LZ4 compressed data from rd to the object storage. // WriteSnapshot writes LZ4 compressed data from rd to the object storage.
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return info, err return info, err
} else if generation == "" {
return info, fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
}
startTime := time.Now() startTime := time.Now()
w := c.bkt.Object(key).NewWriter(ctx) w := c.bkt.Object(key).NewWriter(ctx)
@@ -177,12 +174,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
}
r, err := c.bkt.Object(key).NewReader(ctx) r, err := c.bkt.Object(key).NewReader(ctx)
if isNotExists(err) { if isNotExists(err) {
@@ -201,12 +197,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return err return err
} else if generation == "" {
return fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index), ".snapshot.lz4")
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) { if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) {
return fmt.Errorf("cannot delete snapshot %q: %w", key, err) return fmt.Errorf("cannot delete snapshot %q: %w", key, err)
@@ -220,24 +215,22 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) { func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
dir, err := litestream.WALPath(c.Path, generation) prefix := path.Join(c.Path, "generations", generation, "wal") + "/"
if err != nil { return newWALSegmentIterator(generation, prefix, c.bkt.Objects(ctx, &storage.Query{Prefix: prefix})), nil
return nil, fmt.Errorf("cannot determine wal path: %w", err)
}
return newWALSegmentIterator(generation, c.bkt.Objects(ctx, &storage.Query{Prefix: dir + "/"})), nil
} }
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk. // WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return info, err return info, err
} else if pos.Generation == "" {
return info, fmt.Errorf("generation required")
} }
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
}
startTime := time.Now() startTime := time.Now()
w := c.bkt.Object(key).NewWriter(ctx) w := c.bkt.Object(key).NewWriter(ctx)
@@ -267,12 +260,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if pos.Generation == "" {
return nil, fmt.Errorf("generation required")
} }
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
}
r, err := c.bkt.Object(key).NewReader(ctx) r, err := c.bkt.Object(key).NewReader(ctx)
if isNotExists(err) { if isNotExists(err) {
@@ -294,11 +286,11 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
} }
for _, pos := range a { for _, pos := range a {
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) if pos.Generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine wal segment path: %w", err)
} }
key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) { if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) {
return fmt.Errorf("cannot delete wal segment %q: %w", key, err) return fmt.Errorf("cannot delete wal segment %q: %w", key, err)
} }
@@ -344,7 +336,7 @@ func (itr *snapshotIterator) Next() bool {
} }
// Parse index, otherwise skip to the next object. // Parse index, otherwise skip to the next object.
index, err := litestream.ParseSnapshotPath(path.Base(attrs.Name)) index, err := internal.ParseSnapshotPath(path.Base(attrs.Name))
if err != nil { if err != nil {
continue continue
} }
@@ -366,15 +358,17 @@ func (itr *snapshotIterator) Snapshot() litestream.SnapshotInfo { return itr.inf
type walSegmentIterator struct { type walSegmentIterator struct {
generation string generation string
prefix string
it *storage.ObjectIterator it *storage.ObjectIterator
info litestream.WALSegmentInfo info litestream.WALSegmentInfo
err error err error
} }
func newWALSegmentIterator(generation string, it *storage.ObjectIterator) *walSegmentIterator { func newWALSegmentIterator(generation, prefix string, it *storage.ObjectIterator) *walSegmentIterator {
return &walSegmentIterator{ return &walSegmentIterator{
generation: generation, generation: generation,
prefix: prefix,
it: it, it: it,
} }
} }
@@ -400,7 +394,7 @@ func (itr *walSegmentIterator) Next() bool {
} }
// Parse index & offset, otherwise skip to the next object. // Parse index & offset, otherwise skip to the next object.
index, offset, err := litestream.ParseWALSegmentPath(path.Base(attrs.Name)) index, offset, err := internal.ParseWALSegmentPath(strings.TrimPrefix(attrs.Name, itr.prefix))
if err != nil { if err != nil {
continue continue
} }

View File

@@ -1,8 +1,11 @@
package internal package internal
import ( import (
"fmt"
"io" "io"
"os" "os"
"regexp"
"strconv"
"syscall" "syscall"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -127,6 +130,33 @@ func MkdirAll(path string, fi os.FileInfo) error {
return nil return nil
} }
// ParseSnapshotPath parses the index from a snapshot filename. Used by path-based replicas.
func ParseSnapshotPath(s string) (index int, err error) {
a := snapshotPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, fmt.Errorf("invalid snapshot path")
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), nil
}
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.snapshot\.lz4$`)
// ParseWALSegmentPath parses the index/offset from a segment filename. Used by path-based replicas.
func ParseWALSegmentPath(s string) (index int, offset int64, err error) {
a := walSegmentPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, 0, fmt.Errorf("invalid wal segment path")
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
off64, _ := strconv.ParseUint(a[2], 16, 64)
return int(i64), int64(off64), nil
}
var walSegmentPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\/([0-9a-f]{8})\.wal\.lz4$`)
// Shared replica metrics. // Shared replica metrics.
var ( var (
OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{

61
internal/internal_test.go Normal file
View File

@@ -0,0 +1,61 @@
package internal_test
import (
"fmt"
"reflect"
"testing"
"github.com/benbjohnson/litestream/internal"
)
func TestParseSnapshotPath(t *testing.T) {
for _, tt := range []struct {
s string
index int
err error
}{
{"00bc614e.snapshot.lz4", 12345678, nil},
{"xxxxxxxx.snapshot.lz4", 0, fmt.Errorf("invalid snapshot path")},
{"00bc614.snapshot.lz4", 0, fmt.Errorf("invalid snapshot path")},
{"00bc614e.snapshot.lz", 0, fmt.Errorf("invalid snapshot path")},
{"00bc614e.snapshot", 0, fmt.Errorf("invalid snapshot path")},
{"00bc614e", 0, fmt.Errorf("invalid snapshot path")},
{"", 0, fmt.Errorf("invalid snapshot path")},
} {
t.Run("", func(t *testing.T) {
index, err := internal.ParseSnapshotPath(tt.s)
if got, want := index, tt.index; got != want {
t.Errorf("index=%#v, want %#v", got, want)
} else if got, want := err, tt.err; !reflect.DeepEqual(got, want) {
t.Errorf("err=%#v, want %#v", got, want)
}
})
}
}
func TestParseWALSegmentPath(t *testing.T) {
for _, tt := range []struct {
s string
index int
offset int64
err error
}{
{"00bc614e/000003e8.wal.lz4", 12345678, 1000, nil},
{"00000000/00000000.wal", 0, 0, fmt.Errorf("invalid wal segment path")},
{"00000000/00000000", 0, 0, fmt.Errorf("invalid wal segment path")},
{"00000000/", 0, 0, fmt.Errorf("invalid wal segment path")},
{"00000000", 0, 0, fmt.Errorf("invalid wal segment path")},
{"", 0, 0, fmt.Errorf("invalid wal segment path")},
} {
t.Run("", func(t *testing.T) {
index, offset, err := internal.ParseWALSegmentPath(tt.s)
if got, want := index, tt.index; got != want {
t.Errorf("index=%#v, want %#v", got, want)
} else if got, want := offset, tt.offset; got != want {
t.Errorf("offset=%#v, want %#v", got, want)
} else if got, want := err, tt.err; !reflect.DeepEqual(got, want) {
t.Errorf("err=%#v, want %#v", got, want)
}
})
}
}

View File

@@ -7,9 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path"
"path/filepath" "path/filepath"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -384,134 +382,34 @@ func IsGenerationName(s string) bool {
return true return true
} }
// GenerationsPath returns the path to a generation root directory. // FormatIndex formats an index as an 8-character hex value.
func GenerationsPath(root string) string { func FormatIndex(index int) string {
return path.Join(root, "generations") return fmt.Sprintf("%08x", index)
} }
// GenerationPath returns the path to a generation's root directory. // ParseIndex parses a hex-formatted index into an integer.
func GenerationPath(root, generation string) (string, error) { func ParseIndex(s string) (int, error) {
dir := GenerationsPath(root) v, err := strconv.ParseUint(s, 16, 32)
if generation == "" {
return "", fmt.Errorf("generation required")
}
return path.Join(dir, generation), nil
}
// SnapshotsPath returns the path to a generation's snapshot directory.
func SnapshotsPath(root, generation string) (string, error) {
dir, err := GenerationPath(root, generation)
if err != nil { if err != nil {
return "", err return -1, fmt.Errorf("cannot parse index: %q", s)
} }
return path.Join(dir, "snapshots"), nil return int(v), nil
} }
// SnapshotPath returns the path to an uncompressed snapshot file. // FormatOffset formats an offset as an 8-character hex value.
func SnapshotPath(root, generation string, index int) (string, error) { func FormatOffset(offset int64) string {
dir, err := SnapshotsPath(root, generation) return fmt.Sprintf("%08x", offset)
}
// ParseOffset parses a hex-formatted offset into an integer.
func ParseOffset(s string) (int64, error) {
v, err := strconv.ParseInt(s, 16, 32)
if err != nil { if err != nil {
return "", err return -1, fmt.Errorf("cannot parse index: %q", s)
} }
return path.Join(dir, FormatSnapshotPath(index)), nil return v, nil
} }
// WALPath returns the path to a generation's WAL directory
func WALPath(root, generation string) (string, error) {
dir, err := GenerationPath(root, generation)
if err != nil {
return "", err
}
return path.Join(dir, "wal"), nil
}
// WALSegmentPath returns the path to a WAL segment file.
func WALSegmentPath(root, generation string, index int, offset int64) (string, error) {
dir, err := WALPath(root, generation)
if err != nil {
return "", err
}
return path.Join(dir, FormatWALSegmentPath(index, offset)), nil
}
// IsSnapshotPath returns true if s is a path to a snapshot file.
func IsSnapshotPath(s string) bool {
return snapshotPathRegex.MatchString(s)
}
// ParseSnapshotPath returns the index for the snapshot.
// Returns an error if the path is not a valid snapshot path.
func ParseSnapshotPath(s string) (index int, err error) {
s = filepath.Base(s)
a := snapshotPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, fmt.Errorf("invalid snapshot path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), nil
}
// FormatSnapshotPath formats a snapshot filename with a given index.
func FormatSnapshotPath(index int) string {
assert(index >= 0, "snapshot index must be non-negative")
return fmt.Sprintf("%08x%s", index, SnapshotExt)
}
var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.snapshot\.lz4$`)
// IsWALPath returns true if s is a path to a WAL file.
func IsWALPath(s string) bool {
return walPathRegex.MatchString(s)
}
// ParseWALPath returns the index for the WAL file.
// Returns an error if the path is not a valid WAL path.
func ParseWALPath(s string) (index int, err error) {
s = filepath.Base(s)
a := walPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, fmt.Errorf("invalid wal path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
return int(i64), nil
}
// FormatWALPath formats a WAL filename with a given index.
func FormatWALPath(index int) string {
assert(index >= 0, "wal index must be non-negative")
return fmt.Sprintf("%08x%s", index, WALExt)
}
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.wal$`)
// ParseWALSegmentPath returns the index & offset for the WAL segment file.
// Returns an error if the path is not a valid wal segment path.
func ParseWALSegmentPath(s string) (index int, offset int64, err error) {
s = filepath.Base(s)
a := walSegmentPathRegex.FindStringSubmatch(s)
if a == nil {
return 0, 0, fmt.Errorf("invalid wal segment path: %s", s)
}
i64, _ := strconv.ParseUint(a[1], 16, 64)
off64, _ := strconv.ParseUint(a[2], 16, 64)
return int(i64), int64(off64), nil
}
// FormatWALSegmentPath formats a WAL segment filename with a given index & offset.
func FormatWALSegmentPath(index int, offset int64) string {
assert(index >= 0, "wal index must be non-negative")
assert(offset >= 0, "wal offset must be non-negative")
return fmt.Sprintf("%08x_%08x%s", index, offset, WALSegmentExt)
}
var walSegmentPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))\.wal\.lz4$`)
// isHexChar returns true if ch is a lowercase hex character. // isHexChar returns true if ch is a lowercase hex character.
func isHexChar(ch rune) bool { func isHexChar(ch rune) bool {
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')

View File

@@ -40,94 +40,6 @@ func TestChecksum(t *testing.T) {
}) })
} }
func TestGenerationsPath(t *testing.T) {
t.Run("OK", func(t *testing.T) {
if got, want := litestream.GenerationsPath("foo"), "foo/generations"; got != want {
t.Fatalf("GenerationsPath()=%v, want %v", got, want)
}
})
t.Run("NoPath", func(t *testing.T) {
if got, want := litestream.GenerationsPath(""), "generations"; got != want {
t.Fatalf("GenerationsPath()=%v, want %v", got, want)
}
})
}
func TestGenerationPath(t *testing.T) {
t.Run("OK", func(t *testing.T) {
if got, err := litestream.GenerationPath("foo", "0123456701234567"); err != nil {
t.Fatal(err)
} else if want := "foo/generations/0123456701234567"; got != want {
t.Fatalf("GenerationPath()=%v, want %v", got, want)
}
})
t.Run("ErrNoGeneration", func(t *testing.T) {
if _, err := litestream.GenerationPath("foo", ""); err == nil || err.Error() != `generation required` {
t.Fatalf("expected error: %v", err)
}
})
}
func TestSnapshotsPath(t *testing.T) {
t.Run("OK", func(t *testing.T) {
if got, err := litestream.SnapshotsPath("foo", "0123456701234567"); err != nil {
t.Fatal(err)
} else if want := "foo/generations/0123456701234567/snapshots"; got != want {
t.Fatalf("SnapshotsPath()=%v, want %v", got, want)
}
})
t.Run("ErrNoGeneration", func(t *testing.T) {
if _, err := litestream.SnapshotsPath("foo", ""); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err)
}
})
}
func TestSnapshotPath(t *testing.T) {
t.Run("OK", func(t *testing.T) {
if got, err := litestream.SnapshotPath("foo", "0123456701234567", 1000); err != nil {
t.Fatal(err)
} else if want := "foo/generations/0123456701234567/snapshots/000003e8.snapshot.lz4"; got != want {
t.Fatalf("SnapshotPath()=%v, want %v", got, want)
}
})
t.Run("ErrNoGeneration", func(t *testing.T) {
if _, err := litestream.SnapshotPath("foo", "", 1000); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err)
}
})
}
func TestWALPath(t *testing.T) {
t.Run("OK", func(t *testing.T) {
if got, err := litestream.WALPath("foo", "0123456701234567"); err != nil {
t.Fatal(err)
} else if want := "foo/generations/0123456701234567/wal"; got != want {
t.Fatalf("WALPath()=%v, want %v", got, want)
}
})
t.Run("ErrNoGeneration", func(t *testing.T) {
if _, err := litestream.WALPath("foo", ""); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err)
}
})
}
func TestWALSegmentPath(t *testing.T) {
t.Run("OK", func(t *testing.T) {
if got, err := litestream.WALSegmentPath("foo", "0123456701234567", 1000, 1001); err != nil {
t.Fatal(err)
} else if want := "foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want {
t.Fatalf("WALPath()=%v, want %v", got, want)
}
})
t.Run("ErrNoGeneration", func(t *testing.T) {
if _, err := litestream.WALSegmentPath("foo", "", 1000, 0); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err)
}
})
}
func MustDecodeHexString(s string) []byte { func MustDecodeHexString(s string) []byte {
b, err := hex.DecodeString(s) b, err := hex.DecodeString(s)
if err != nil { if err != nil {

View File

@@ -177,7 +177,7 @@ func TestReplicaClient_Snapshots(t *testing.T) {
if err == nil { if err == nil {
err = itr.Close() err = itr.Close()
} }
if err == nil || err.Error() != `cannot determine snapshots path: generation required` { if err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
}) })
@@ -204,7 +204,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) {
RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) {
t.Parallel() t.Parallel()
if _, err := c.WriteSnapshot(context.Background(), "", 0, nil); err == nil || err.Error() != `cannot determine snapshot path: generation required` { if _, err := c.WriteSnapshot(context.Background(), "", 0, nil); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
}) })
@@ -242,13 +242,13 @@ func TestReplicaClient_SnapshotReader(t *testing.T) {
RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) {
t.Parallel() t.Parallel()
if _, err := c.SnapshotReader(context.Background(), "", 1); err == nil || err.Error() != `cannot determine snapshot path: generation required` { if _, err := c.SnapshotReader(context.Background(), "", 1); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
}) })
} }
func TestReplicaClient_WALs(t *testing.T) { func TestReplicaClient_WALSegments(t *testing.T) {
RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) {
t.Parallel() t.Parallel()
@@ -362,7 +362,7 @@ func TestReplicaClient_WALs(t *testing.T) {
if err == nil { if err == nil {
err = itr.Close() err = itr.Close()
} }
if err == nil || err.Error() != `cannot determine wal path: generation required` { if err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
}) })
@@ -389,13 +389,13 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) {
RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) {
t.Parallel() t.Parallel()
if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "", Index: 0, Offset: 0}, nil); err == nil || err.Error() != `cannot determine wal segment path: generation required` { if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "", Index: 0, Offset: 0}, nil); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
}) })
} }
func TestReplicaClient_WALReader(t *testing.T) { func TestReplicaClient_WALSegmentReader(t *testing.T) {
RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) {
t.Parallel() t.Parallel()
@@ -451,7 +451,7 @@ func TestReplicaClient_DeleteWALSegments(t *testing.T) {
RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) {
t.Parallel() t.Parallel()
if err := c.DeleteWALSegments(context.Background(), []litestream.Pos{{}}); err == nil || err.Error() != `cannot determine wal segment path: generation required` { if err := c.DeleteWALSegments(context.Background(), []litestream.Pos{{}}); err == nil || err.Error() != `generation required` {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
}) })

View File

@@ -10,6 +10,7 @@ import (
"os" "os"
"path" "path"
"regexp" "regexp"
"strings"
"sync" "sync"
"time" "time"
@@ -154,7 +155,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
var generations []string var generations []string
if err := c.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ if err := c.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Prefix: aws.String(litestream.GenerationsPath(c.Path) + "/"), Prefix: aws.String(path.Join(c.Path, "generations") + "/"),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool { }, func(page *s3.ListObjectsOutput, lastPage bool) bool {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
@@ -178,18 +179,15 @@ func (c *ReplicaClient) Generations(ctx context.Context) ([]string, error) {
func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error { func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) error {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return err return err
} } else if generation == "" {
return fmt.Errorf("generation required")
dir, err := litestream.GenerationPath(c.Path, generation)
if err != nil {
return fmt.Errorf("cannot determine generation path: %w", err)
} }
// Collect all files for the generation. // Collect all files for the generation.
var objIDs []*s3.ObjectIdentifier var objIDs []*s3.ObjectIdentifier
if err := c.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ if err := c.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
Prefix: aws.String(dir), Prefix: aws.String(path.Join(c.Path, "generations", generation)),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool { }, func(page *s3.ListObjectsOutput, lastPage bool) bool {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
@@ -236,12 +234,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return info, err return info, err
} else if generation == "" {
return info, fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
}
startTime := time.Now() startTime := time.Now()
rc := internal.NewReadCounter(rd) rc := internal.NewReadCounter(rd)
@@ -270,12 +267,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
}
out, err := c.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ out, err := c.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
@@ -296,12 +292,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return err return err
} else if generation == "" {
return fmt.Errorf("generation required")
} }
key, err := litestream.SnapshotPath(c.Path, generation, index) key := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
@@ -326,12 +321,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return info, err return info, err
} else if pos.Generation == "" {
return info, fmt.Errorf("generation required")
} }
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
}
startTime := time.Now() startTime := time.Now()
rc := internal.NewReadCounter(rd) rc := internal.NewReadCounter(rd)
@@ -360,12 +354,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil { if err := c.Init(ctx); err != nil {
return nil, err return nil, err
} else if pos.Generation == "" {
return nil, fmt.Errorf("generation required")
} }
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
}
out, err := c.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ out, err := c.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.Bucket), Bucket: aws.String(c.Bucket),
@@ -397,10 +390,10 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
// Generate a batch of object IDs for deleting the WAL segments. // Generate a batch of object IDs for deleting the WAL segments.
for i, pos := range a[:n] { for i, pos := range a[:n] {
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) if pos.Generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine wal segment path: %w", err)
} }
key := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
objIDs[i] = &s3.ObjectIdentifier{Key: &key} objIDs[i] = &s3.ObjectIdentifier{Key: &key}
} }
@@ -498,11 +491,12 @@ func newSnapshotIterator(ctx context.Context, client *ReplicaClient, generation
func (itr *snapshotIterator) fetch() error { func (itr *snapshotIterator) fetch() error {
defer close(itr.ch) defer close(itr.ch)
dir, err := litestream.SnapshotsPath(itr.client.Path, itr.generation) if itr.generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine snapshots path: %w", err)
} }
dir := path.Join(itr.client.Path, "generations", itr.generation, "snapshots")
return itr.client.s3.ListObjectsPagesWithContext(itr.ctx, &s3.ListObjectsInput{ return itr.client.s3.ListObjectsPagesWithContext(itr.ctx, &s3.ListObjectsInput{
Bucket: aws.String(itr.client.Bucket), Bucket: aws.String(itr.client.Bucket),
Prefix: aws.String(dir + "/"), Prefix: aws.String(dir + "/"),
@@ -511,8 +505,7 @@ func (itr *snapshotIterator) fetch() error {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
for _, obj := range page.Contents { for _, obj := range page.Contents {
key := path.Base(*obj.Key) index, err := internal.ParseSnapshotPath(path.Base(*obj.Key))
index, err := litestream.ParseSnapshotPath(key)
if err != nil { if err != nil {
continue continue
} }
@@ -601,21 +594,20 @@ func newWALSegmentIterator(ctx context.Context, client *ReplicaClient, generatio
func (itr *walSegmentIterator) fetch() error { func (itr *walSegmentIterator) fetch() error {
defer close(itr.ch) defer close(itr.ch)
dir, err := litestream.WALPath(itr.client.Path, itr.generation) if itr.generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine wal path: %w", err)
} }
prefix := path.Join(itr.client.Path, "generations", itr.generation, "wal") + "/"
return itr.client.s3.ListObjectsPagesWithContext(itr.ctx, &s3.ListObjectsInput{ return itr.client.s3.ListObjectsPagesWithContext(itr.ctx, &s3.ListObjectsInput{
Bucket: aws.String(itr.client.Bucket), Bucket: aws.String(itr.client.Bucket),
Prefix: aws.String(dir + "/"), Prefix: aws.String(prefix),
Delimiter: aws.String("/"),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool { }, func(page *s3.ListObjectsOutput, lastPage bool) bool {
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
for _, obj := range page.Contents { for _, obj := range page.Contents {
key := path.Base(*obj.Key) index, offset, err := internal.ParseWALSegmentPath(strings.TrimPrefix(*obj.Key, prefix))
index, offset, err := litestream.ParseWALSegmentPath(key)
if err != nil { if err != nil {
continue continue
} }

View File

@@ -9,6 +9,7 @@ import (
"os" "os"
"path" "path"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
@@ -121,7 +122,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) (_ []string, err error)
return nil, err return nil, err
} }
fis, err := sftpClient.ReadDir(litestream.GenerationsPath(c.Path)) fis, err := sftpClient.ReadDir(path.Join(c.Path, "generations"))
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
} else if err != nil { } else if err != nil {
@@ -153,12 +154,11 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return err return err
} else if generation == "" {
return fmt.Errorf("generation required")
} }
dir, err := litestream.GenerationPath(c.Path, generation) dir := path.Join(c.Path, "generations", generation)
if err != nil {
return fmt.Errorf("cannot determine generation path: %w", err)
}
var dirs []string var dirs []string
walker := sftpClient.Walk(dir) walker := sftpClient.Walk(dir)
@@ -198,12 +198,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
dir, err := litestream.SnapshotsPath(c.Path, generation) dir := path.Join(c.Path, "generations", generation, "snapshots")
if err != nil {
return nil, fmt.Errorf("cannot determine snapshots path: %w", err)
}
fis, err := sftpClient.ReadDir(dir) fis, err := sftpClient.ReadDir(dir)
if os.IsNotExist(err) { if os.IsNotExist(err) {
@@ -216,7 +215,7 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit
infos := make([]litestream.SnapshotInfo, 0, len(fis)) infos := make([]litestream.SnapshotInfo, 0, len(fis))
for _, fi := range fis { for _, fi := range fis {
// Parse index from filename. // Parse index from filename.
index, err := litestream.ParseSnapshotPath(path.Base(fi.Name())) index, err := internal.ParseSnapshotPath(path.Base(fi.Name()))
if err != nil { if err != nil {
continue continue
} }
@@ -241,12 +240,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return info, err return info, err
} else if generation == "" {
return info, fmt.Errorf("generation required")
} }
filename, err := litestream.SnapshotPath(c.Path, generation, index) filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
}
startTime := time.Now() startTime := time.Now()
if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil {
@@ -286,12 +284,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
filename, err := litestream.SnapshotPath(c.Path, generation, index) filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
}
f, err := sftpClient.Open(filename) f, err := sftpClient.Open(filename)
if err != nil { if err != nil {
@@ -310,12 +307,11 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return err return err
} else if generation == "" {
return fmt.Errorf("generation required")
} }
filename, err := litestream.SnapshotPath(c.Path, generation, index) filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("cannot delete snapshot %q: %w", filename, err) return fmt.Errorf("cannot delete snapshot %q: %w", filename, err)
@@ -332,12 +328,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} else if generation == "" {
return nil, fmt.Errorf("generation required")
} }
dir, err := litestream.WALPath(c.Path, generation) dir := path.Join(c.Path, "generations", generation, "wal")
if err != nil {
return nil, fmt.Errorf("cannot determine wal path: %w", err)
}
fis, err := sftpClient.ReadDir(dir) fis, err := sftpClient.ReadDir(dir)
if os.IsNotExist(err) { if os.IsNotExist(err) {
@@ -347,25 +342,18 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l
} }
// Iterate over every file and convert to metadata. // Iterate over every file and convert to metadata.
infos := make([]litestream.WALSegmentInfo, 0, len(fis)) indexes := make([]int, 0, len(fis))
for _, fi := range fis { for _, fi := range fis {
index, offset, err := litestream.ParseWALSegmentPath(path.Base(fi.Name())) index, err := litestream.ParseIndex(fi.Name())
if err != nil { if err != nil || !fi.IsDir() {
continue continue
} }
indexes = append(indexes, index)
infos = append(infos, litestream.WALSegmentInfo{
Generation: generation,
Index: index,
Offset: offset,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
} }
sort.Sort(litestream.WALSegmentInfoSlice(infos)) sort.Ints(indexes)
return litestream.NewWALSegmentInfoSliceIterator(infos), nil return newWALSegmentIterator(ctx, c, dir, generation, indexes), nil
} }
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk. // WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
@@ -375,12 +363,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return info, err return info, err
} else if pos.Generation == "" {
return info, fmt.Errorf("generation required")
} }
filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
}
startTime := time.Now() startTime := time.Now()
if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil {
@@ -420,12 +407,11 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos
sftpClient, err := c.Init(ctx) sftpClient, err := c.Init(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} else if pos.Generation == "" {
return nil, fmt.Errorf("generation required")
} }
filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err != nil {
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
}
f, err := sftpClient.Open(filename) f, err := sftpClient.Open(filename)
if err != nil { if err != nil {
@@ -447,11 +433,12 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
} }
for _, pos := range a { for _, pos := range a {
filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) if pos.Generation == "" {
if err != nil { return fmt.Errorf("generation required")
return fmt.Errorf("cannot determine wal segment path: %w", err)
} }
filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("cannot delete wal segment %q: %w", filename, err) return fmt.Errorf("cannot delete wal segment %q: %w", filename, err)
} }
@@ -470,7 +457,7 @@ func (c *ReplicaClient) Cleanup(ctx context.Context) (err error) {
return err return err
} }
if err := sftpClient.RemoveDirectory(litestream.GenerationsPath(c.Path)); err != nil && !os.IsNotExist(err) { if err := sftpClient.RemoveDirectory(path.Join(c.Path, "generations")); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("cannot delete generations path: %w", err) return fmt.Errorf("cannot delete generations path: %w", err)
} else if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) { } else if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("cannot delete path: %w", err) return fmt.Errorf("cannot delete path: %w", err)
@@ -493,3 +480,101 @@ func (c *ReplicaClient) resetOnConnError(err error) {
c.sshClient = nil c.sshClient = nil
} }
} }
type walSegmentIterator struct {
ctx context.Context
client *ReplicaClient
dir string
generation string
indexes []int
infos []litestream.WALSegmentInfo
err error
}
func newWALSegmentIterator(ctx context.Context, client *ReplicaClient, dir, generation string, indexes []int) *walSegmentIterator {
return &walSegmentIterator{
ctx: ctx,
client: client,
dir: dir,
generation: generation,
indexes: indexes,
}
}
func (itr *walSegmentIterator) Close() (err error) {
return itr.err
}
func (itr *walSegmentIterator) Next() bool {
sftpClient, err := itr.client.Init(itr.ctx)
if err != nil {
itr.err = err
return false
}
// Exit if an error has already occurred.
if itr.err != nil {
return false
}
for {
// Move to the next segment in cache, if available.
if len(itr.infos) > 1 {
itr.infos = itr.infos[1:]
return true
}
itr.infos = itr.infos[:0] // otherwise clear infos
// Move to the next index unless this is the first time initializing.
if itr.infos != nil && len(itr.indexes) > 0 {
itr.indexes = itr.indexes[1:]
}
// If no indexes remain, stop iteration.
if len(itr.indexes) == 0 {
return false
}
// Read segments into a cache for the current index.
index := itr.indexes[0]
fis, err := sftpClient.ReadDir(path.Join(itr.dir, litestream.FormatIndex(index)))
if err != nil {
itr.err = err
return false
}
for _, fi := range fis {
filename := path.Base(fi.Name())
if fi.IsDir() {
continue
}
offset, err := litestream.ParseOffset(strings.TrimSuffix(filename, ".wal.lz4"))
if err != nil {
continue
}
itr.infos = append(itr.infos, litestream.WALSegmentInfo{
Generation: itr.generation,
Index: index,
Offset: offset,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
}
if len(itr.infos) > 0 {
return true
}
}
}
func (itr *walSegmentIterator) Err() error { return itr.err }
func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo {
if len(itr.infos) == 0 {
return litestream.WALSegmentInfo{}
}
return itr.infos[0]
}