Implement streaming WAL segment iterator

Currently, WALSegmentIterator implementations read to the end of
the end of their list of segments and return EOF. This commit adds
the ability to push additional segments to in-process iterators and
notify their callers that new segments are available. This is only
implemented for the file-based iterator but other segment iterators
may get this implementation in the future or have a wrapping
iterator provide a polling-based implementation.
This commit is contained in:
Ben Johnson
2022-02-11 13:43:50 -07:00
parent 006e4b7155
commit 8589111717
6 changed files with 400 additions and 77 deletions

View File

@@ -9,6 +9,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"github.com/benbjohnson/litestream/internal"
)
@@ -285,7 +286,7 @@ func (c *FileReplicaClient) WALSegments(ctx context.Context, generation string)
sort.Ints(indexes)
return newFileWALSegmentIterator(dir, generation, indexes), nil
return NewFileWALSegmentIterator(dir, generation, indexes), nil
}
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
@@ -360,33 +361,74 @@ func (c *FileReplicaClient) DeleteWALSegments(ctx context.Context, a []Pos) erro
return nil
}
type fileWalSegmentIterator struct {
type FileWALSegmentIterator struct {
mu sync.Mutex
notifyCh chan struct{}
closeFunc func() error
dir string
generation string
indexes []int
infos []WALSegmentInfo
err error
buffered bool
infos []WALSegmentInfo
err error
}
func newFileWALSegmentIterator(dir, generation string, indexes []int) *fileWalSegmentIterator {
return &fileWalSegmentIterator{
func NewFileWALSegmentIterator(dir, generation string, indexes []int) *FileWALSegmentIterator {
return &FileWALSegmentIterator{
dir: dir,
generation: generation,
indexes: indexes,
notifyCh: make(chan struct{}, 1),
}
}
func (itr *fileWalSegmentIterator) Close() (err error) {
return itr.err
func (itr *FileWALSegmentIterator) Close() (err error) {
if itr.closeFunc != nil {
if e := itr.closeFunc(); e != nil && err == nil {
err = e
}
}
if e := itr.Err(); e != nil && err == nil {
err = e
}
return err
}
func (itr *fileWalSegmentIterator) Next() bool {
func (itr *FileWALSegmentIterator) NotifyCh() <-chan struct{} {
return itr.notifyCh
}
// Generation returns the generation this iterator was initialized with.
func (itr *FileWALSegmentIterator) Generation() string {
return itr.generation
}
// Indexes returns the pending indexes. Only used for testing.
func (itr *FileWALSegmentIterator) Indexes() []int {
itr.mu.Lock()
defer itr.mu.Unlock()
return itr.indexes
}
func (itr *FileWALSegmentIterator) Next() bool {
itr.mu.Lock()
defer itr.mu.Unlock()
// Exit if an error has already occurred.
if itr.err != nil {
return false
}
// Read first info, if buffered.
if itr.buffered {
itr.buffered = false
return true
}
for {
// Move to the next segment in cache, if available.
if len(itr.infos) > 1 {
@@ -448,11 +490,94 @@ func (itr *fileWalSegmentIterator) Next() bool {
}
}
func (itr *fileWalSegmentIterator) Err() error { return itr.err }
// SetErr sets the error on the iterator and notifies it of the change.
func (itr *FileWALSegmentIterator) SetErr(err error) {
itr.mu.Lock()
defer itr.mu.Unlock()
if itr.err == nil {
itr.err = err
}
select {
case itr.notifyCh <- struct{}{}:
default:
}
}
// Err returns the first error that occurs on the iterator.
func (itr *FileWALSegmentIterator) Err() error {
itr.mu.Lock()
defer itr.mu.Unlock()
return itr.err
}
func (itr *FileWALSegmentIterator) WALSegment() WALSegmentInfo {
itr.mu.Lock()
defer itr.mu.Unlock()
func (itr *fileWalSegmentIterator) WALSegment() WALSegmentInfo {
if len(itr.infos) == 0 {
return WALSegmentInfo{}
}
return itr.infos[0]
}
// Append add an additional WAL segment to the end of the iterator. This
// function expects that info will always be later than all previous infos
// that the iterator has or has seen.
func (itr *FileWALSegmentIterator) Append(info WALSegmentInfo) error {
itr.mu.Lock()
defer itr.mu.Unlock()
if itr.err != nil {
return itr.err
} else if itr.generation != info.Generation {
return fmt.Errorf("generation mismatch")
}
// If the info has an index that is still waiting to be read from disk into
// the cache then simply append it to the end of the indices.
//
// If we have no pending indices, then append to the end of the infos. If
// we don't have either then just append to the infos and avoid validation.
if len(itr.indexes) > 0 {
maxIndex := itr.indexes[len(itr.indexes)-1]
if info.Index < maxIndex {
return fmt.Errorf("appended index %q below max index %q", FormatIndex(info.Index), FormatIndex(maxIndex))
} else if info.Index > maxIndex+1 {
return fmt.Errorf("appended index %q skips index %q", FormatIndex(info.Index), FormatIndex(maxIndex+1))
} else if info.Index == maxIndex+1 {
itr.indexes = append(itr.indexes, info.Index)
}
// NOTE: no-op if segment index matches the current last index
} else if len(itr.infos) > 0 {
lastInfo := itr.infos[len(itr.infos)-1]
if info.Index < lastInfo.Index {
return fmt.Errorf("appended index %q below current index %q", FormatIndex(info.Index), FormatIndex(lastInfo.Index))
} else if info.Index > lastInfo.Index+1 {
return fmt.Errorf("appended index %q skips next index %q", FormatIndex(info.Index), FormatIndex(lastInfo.Index+1))
} else if info.Index == lastInfo.Index+1 {
itr.indexes = append(itr.indexes, info.Index)
} else {
// If the index matches the current infos, verify its offset and append.
if info.Offset < lastInfo.Offset {
return fmt.Errorf("appended offset %s/%s before last offset %s/%s", FormatIndex(info.Index), FormatOffset(info.Offset), FormatIndex(lastInfo.Index), FormatOffset(lastInfo.Offset))
} else if info.Offset == lastInfo.Offset {
return fmt.Errorf("duplicate offset %s/%s appended", FormatIndex(info.Index), FormatOffset(info.Offset))
}
itr.infos = append(itr.infos, info)
}
} else {
itr.buffered = true
itr.infos = append(itr.infos, info)
}
// Signal that a new segment is available.
select {
case itr.notifyCh <- struct{}{}:
default:
}
return nil
}