Group replica wal segments by index
This commit changes the replica path format to group segments within a single index in the same directory. This is to eventually add the ability to seek to a record on file-based systems without having to iterate over the records. The DB shadow WAL will also be changed to this same format to support live replicas.
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -121,7 +122,7 @@ func (c *ReplicaClient) Generations(ctx context.Context) (_ []string, err error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fis, err := sftpClient.ReadDir(litestream.GenerationsPath(c.Path))
|
||||
fis, err := sftpClient.ReadDir(path.Join(c.Path, "generations"))
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
@@ -153,12 +154,11 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string)
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if generation == "" {
|
||||
return fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
dir, err := litestream.GenerationPath(c.Path, generation)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine generation path: %w", err)
|
||||
}
|
||||
dir := path.Join(c.Path, "generations", generation)
|
||||
|
||||
var dirs []string
|
||||
walker := sftpClient.Walk(dir)
|
||||
@@ -198,12 +198,11 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if generation == "" {
|
||||
return nil, fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
dir, err := litestream.SnapshotsPath(c.Path, generation)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot determine snapshots path: %w", err)
|
||||
}
|
||||
dir := path.Join(c.Path, "generations", generation, "snapshots")
|
||||
|
||||
fis, err := sftpClient.ReadDir(dir)
|
||||
if os.IsNotExist(err) {
|
||||
@@ -216,7 +215,7 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit
|
||||
infos := make([]litestream.SnapshotInfo, 0, len(fis))
|
||||
for _, fi := range fis {
|
||||
// Parse index from filename.
|
||||
index, err := litestream.ParseSnapshotPath(path.Base(fi.Name()))
|
||||
index, err := internal.ParseSnapshotPath(path.Base(fi.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -241,12 +240,11 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return info, err
|
||||
} else if generation == "" {
|
||||
return info, fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
filename, err := litestream.SnapshotPath(c.Path, generation, index)
|
||||
if err != nil {
|
||||
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
|
||||
}
|
||||
filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
|
||||
startTime := time.Now()
|
||||
|
||||
if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil {
|
||||
@@ -286,12 +284,11 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if generation == "" {
|
||||
return nil, fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
filename, err := litestream.SnapshotPath(c.Path, generation, index)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
|
||||
}
|
||||
filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
|
||||
|
||||
f, err := sftpClient.Open(filename)
|
||||
if err != nil {
|
||||
@@ -310,12 +307,11 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if generation == "" {
|
||||
return fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
filename, err := litestream.SnapshotPath(c.Path, generation, index)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine snapshot path: %w", err)
|
||||
}
|
||||
filename := path.Join(c.Path, "generations", generation, "snapshots", litestream.FormatIndex(index)+".snapshot.lz4")
|
||||
|
||||
if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("cannot delete snapshot %q: %w", filename, err)
|
||||
@@ -332,12 +328,11 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if generation == "" {
|
||||
return nil, fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
dir, err := litestream.WALPath(c.Path, generation)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot determine wal path: %w", err)
|
||||
}
|
||||
dir := path.Join(c.Path, "generations", generation, "wal")
|
||||
|
||||
fis, err := sftpClient.ReadDir(dir)
|
||||
if os.IsNotExist(err) {
|
||||
@@ -347,25 +342,18 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l
|
||||
}
|
||||
|
||||
// Iterate over every file and convert to metadata.
|
||||
infos := make([]litestream.WALSegmentInfo, 0, len(fis))
|
||||
indexes := make([]int, 0, len(fis))
|
||||
for _, fi := range fis {
|
||||
index, offset, err := litestream.ParseWALSegmentPath(path.Base(fi.Name()))
|
||||
if err != nil {
|
||||
index, err := litestream.ParseIndex(fi.Name())
|
||||
if err != nil || !fi.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
infos = append(infos, litestream.WALSegmentInfo{
|
||||
Generation: generation,
|
||||
Index: index,
|
||||
Offset: offset,
|
||||
Size: fi.Size(),
|
||||
CreatedAt: fi.ModTime().UTC(),
|
||||
})
|
||||
indexes = append(indexes, index)
|
||||
}
|
||||
|
||||
sort.Sort(litestream.WALSegmentInfoSlice(infos))
|
||||
sort.Ints(indexes)
|
||||
|
||||
return litestream.NewWALSegmentInfoSliceIterator(infos), nil
|
||||
return newWALSegmentIterator(ctx, c, dir, generation, indexes), nil
|
||||
}
|
||||
|
||||
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
|
||||
@@ -375,12 +363,11 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return info, err
|
||||
} else if pos.Generation == "" {
|
||||
return info, fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
|
||||
if err != nil {
|
||||
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
|
||||
}
|
||||
filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
|
||||
startTime := time.Now()
|
||||
|
||||
if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil {
|
||||
@@ -420,12 +407,11 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos
|
||||
sftpClient, err := c.Init(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if pos.Generation == "" {
|
||||
return nil, fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
|
||||
}
|
||||
filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
|
||||
|
||||
f, err := sftpClient.Open(filename)
|
||||
if err != nil {
|
||||
@@ -447,11 +433,12 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po
|
||||
}
|
||||
|
||||
for _, pos := range a {
|
||||
filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot determine wal segment path: %w", err)
|
||||
if pos.Generation == "" {
|
||||
return fmt.Errorf("generation required")
|
||||
}
|
||||
|
||||
filename := path.Join(c.Path, "generations", pos.Generation, "wal", litestream.FormatIndex(pos.Index), litestream.FormatOffset(pos.Offset)+".wal.lz4")
|
||||
|
||||
if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("cannot delete wal segment %q: %w", filename, err)
|
||||
}
|
||||
@@ -470,7 +457,7 @@ func (c *ReplicaClient) Cleanup(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := sftpClient.RemoveDirectory(litestream.GenerationsPath(c.Path)); err != nil && !os.IsNotExist(err) {
|
||||
if err := sftpClient.RemoveDirectory(path.Join(c.Path, "generations")); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("cannot delete generations path: %w", err)
|
||||
} else if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("cannot delete path: %w", err)
|
||||
@@ -493,3 +480,101 @@ func (c *ReplicaClient) resetOnConnError(err error) {
|
||||
c.sshClient = nil
|
||||
}
|
||||
}
|
||||
|
||||
type walSegmentIterator struct {
|
||||
ctx context.Context
|
||||
client *ReplicaClient
|
||||
dir string
|
||||
generation string
|
||||
indexes []int
|
||||
|
||||
infos []litestream.WALSegmentInfo
|
||||
err error
|
||||
}
|
||||
|
||||
func newWALSegmentIterator(ctx context.Context, client *ReplicaClient, dir, generation string, indexes []int) *walSegmentIterator {
|
||||
return &walSegmentIterator{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
dir: dir,
|
||||
generation: generation,
|
||||
indexes: indexes,
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *walSegmentIterator) Close() (err error) {
|
||||
return itr.err
|
||||
}
|
||||
|
||||
func (itr *walSegmentIterator) Next() bool {
|
||||
sftpClient, err := itr.client.Init(itr.ctx)
|
||||
if err != nil {
|
||||
itr.err = err
|
||||
return false
|
||||
}
|
||||
|
||||
// Exit if an error has already occurred.
|
||||
if itr.err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for {
|
||||
// Move to the next segment in cache, if available.
|
||||
if len(itr.infos) > 1 {
|
||||
itr.infos = itr.infos[1:]
|
||||
return true
|
||||
}
|
||||
itr.infos = itr.infos[:0] // otherwise clear infos
|
||||
|
||||
// Move to the next index unless this is the first time initializing.
|
||||
if itr.infos != nil && len(itr.indexes) > 0 {
|
||||
itr.indexes = itr.indexes[1:]
|
||||
}
|
||||
|
||||
// If no indexes remain, stop iteration.
|
||||
if len(itr.indexes) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Read segments into a cache for the current index.
|
||||
index := itr.indexes[0]
|
||||
fis, err := sftpClient.ReadDir(path.Join(itr.dir, litestream.FormatIndex(index)))
|
||||
if err != nil {
|
||||
itr.err = err
|
||||
return false
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
filename := path.Base(fi.Name())
|
||||
if fi.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
offset, err := litestream.ParseOffset(strings.TrimSuffix(filename, ".wal.lz4"))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
itr.infos = append(itr.infos, litestream.WALSegmentInfo{
|
||||
Generation: itr.generation,
|
||||
Index: index,
|
||||
Offset: offset,
|
||||
Size: fi.Size(),
|
||||
CreatedAt: fi.ModTime().UTC(),
|
||||
})
|
||||
}
|
||||
|
||||
if len(itr.infos) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *walSegmentIterator) Err() error { return itr.err }
|
||||
|
||||
func (itr *walSegmentIterator) WALSegment() litestream.WALSegmentInfo {
|
||||
if len(itr.infos) == 0 {
|
||||
return litestream.WALSegmentInfo{}
|
||||
}
|
||||
return itr.infos[0]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user