diff --git a/db.go b/db.go index a8c59de..7821f62 100644 --- a/db.go +++ b/db.go @@ -45,15 +45,17 @@ const BusyTimeout = 1 * time.Second // DB represents a managed instance of a SQLite database in the file system. type DB struct { - mu sync.RWMutex - path string // part to database - db *sql.DB // target database - f *os.File // long-running db file descriptor - rtx *sql.Tx // long running read transaction - pos Pos // cached position - pageSize int // page size, in bytes - notifyCh chan struct{} // notifies DB of changes - walNotify chan struct{} // closes on WAL change + mu sync.RWMutex + path string // part to database + db *sql.DB // target database + f *os.File // long-running db file descriptor + rtx *sql.Tx // long running read transaction + pos Pos // cached position + pageSize int // page size, in bytes + notifyCh chan struct{} // notifies DB of changes + + // Iterators used to stream new WAL changes to replicas + itrs map[*FileWALSegmentIterator]struct{} // Cached salt & checksum from current shadow header. hdr []byte @@ -111,9 +113,10 @@ type DB struct { // NewDB returns a new instance of DB for a given path. func NewDB(path string) *DB { db := &DB{ - path: path, - notifyCh: make(chan struct{}, 1), - walNotify: make(chan struct{}), + path: path, + notifyCh: make(chan struct{}, 1), + + itrs: make(map[*FileWALSegmentIterator]struct{}), MinCheckpointPageN: DefaultMinCheckpointPageN, MaxCheckpointPageN: DefaultMaxCheckpointPageN, @@ -245,7 +248,7 @@ func (db *DB) invalidatePos(ctx context.Context) error { } // Iterate over all segments to find the last one. - itr, err := db.WALSegments(context.Background(), generation) + itr, err := db.walSegments(context.Background(), generation, false) if err != nil { return err } @@ -363,13 +366,6 @@ func (db *DB) NotifyCh() chan<- struct{} { return db.notifyCh } -// WALNotify returns a channel that closes when the shadow WAL changes. -func (db *DB) WALNotify() <-chan struct{} { - db.mu.RLock() - defer db.mu.RUnlock() - return db.walNotify -} - // PageSize returns the page size of the underlying database. // Only valid after database exists & Init() has successfully run. func (db *DB) PageSize() int { @@ -440,6 +436,14 @@ func (db *DB) Close() (err error) { } } + // Remove all iterators. + db.mu.Lock() + for itr := range db.itrs { + itr.SetErr(ErrDBClosed) + delete(db.itrs, itr) + } + db.mu.Unlock() + // Release the read lock to allow other applications to handle checkpointing. if db.rtx != nil { if e := db.releaseReadLock(); e != nil && err == nil { @@ -833,7 +837,6 @@ func (db *DB) sync(ctx context.Context) (err error) { return fmt.Errorf("invalidate: %w", err) } } - origPos := db.pos // If sync fails, reset position & cache. defer func() { @@ -934,12 +937,6 @@ func (db *DB) sync(ctx context.Context) (err error) { db.shadowWALIndexGauge.Set(float64(db.pos.Index)) db.shadowWALSizeGauge.Set(float64(db.pos.Offset)) - // Notify replicas of WAL changes. - if db.pos != origPos { - close(db.walNotify) - db.walNotify = make(chan struct{}) - } - return nil } @@ -1263,7 +1260,8 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error } defer f.Close() - if _, err := io.Copy(f, rd); err != nil { + n, err := io.Copy(f, rd) + if err != nil { return err } else if err := f.Sync(); err != nil { return err @@ -1276,14 +1274,47 @@ func (db *DB) writeWALSegment(ctx context.Context, pos Pos, rd io.Reader) error return err } + // Generate + info := WALSegmentInfo{ + Generation: pos.Generation, + Index: pos.Index, + Offset: pos.Offset, + Size: n, + CreatedAt: time.Now(), + } + + // Notify all managed segment iterators. + for itr := range db.itrs { + // Notify iterators of generation change. + if itr.Generation() != pos.Generation { + itr.SetErr(ErrGenerationChanged) + delete(db.itrs, itr) + continue + } + + // Attempt to append segment to end of iterator. + // On error, mark it on the iterator and remove from future notifications. + if err := itr.Append(info); err != nil { + itr.SetErr(fmt.Errorf("cannot append wal segment: %w", err)) + delete(db.itrs, itr) + continue + } + } + return nil } // WALSegments returns an iterator over all available WAL files for a generation. -func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) { +func (db *DB) WALSegments(ctx context.Context, generation string) (*FileWALSegmentIterator, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.walSegments(ctx, generation, true) +} + +func (db *DB) walSegments(ctx context.Context, generation string, managed bool) (*FileWALSegmentIterator, error) { ents, err := os.ReadDir(db.ShadowWALDir(generation)) if os.IsNotExist(err) { - return NewWALSegmentInfoSliceIterator(nil), nil + return NewFileWALSegmentIterator(db.ShadowWALDir(generation), generation, nil), nil } else if err != nil { return nil, err } @@ -1300,7 +1331,27 @@ func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIte sort.Ints(indexes) - return newFileWALSegmentIterator(db.ShadowWALDir(generation), generation, indexes), nil + itr := NewFileWALSegmentIterator(db.ShadowWALDir(generation), generation, indexes) + + // Managed iterators will have new segments pushed to them. + if managed { + itr.closeFunc = func() error { + return db.CloseWALSegmentIterator(itr) + } + + db.itrs[itr] = struct{}{} + } + + return itr, nil +} + +// CloseWALSegmentIterator removes itr from the list of managed iterators. +func (db *DB) CloseWALSegmentIterator(itr *FileWALSegmentIterator) error { + db.mu.Lock() + defer db.mu.Unlock() + + delete(db.itrs, itr) + return nil } // SQLite WAL constants diff --git a/file_replica_client.go b/file_replica_client.go index 2eef73a..dc323a3 100644 --- a/file_replica_client.go +++ b/file_replica_client.go @@ -9,6 +9,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "github.com/benbjohnson/litestream/internal" ) @@ -285,7 +286,7 @@ func (c *FileReplicaClient) WALSegments(ctx context.Context, generation string) sort.Ints(indexes) - return newFileWALSegmentIterator(dir, generation, indexes), nil + return NewFileWALSegmentIterator(dir, generation, indexes), nil } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. @@ -360,33 +361,74 @@ func (c *FileReplicaClient) DeleteWALSegments(ctx context.Context, a []Pos) erro return nil } -type fileWalSegmentIterator struct { +type FileWALSegmentIterator struct { + mu sync.Mutex + notifyCh chan struct{} + closeFunc func() error + dir string generation string indexes []int - infos []WALSegmentInfo - err error + buffered bool + infos []WALSegmentInfo + err error } -func newFileWALSegmentIterator(dir, generation string, indexes []int) *fileWalSegmentIterator { - return &fileWalSegmentIterator{ +func NewFileWALSegmentIterator(dir, generation string, indexes []int) *FileWALSegmentIterator { + return &FileWALSegmentIterator{ dir: dir, generation: generation, indexes: indexes, + + notifyCh: make(chan struct{}, 1), } } -func (itr *fileWalSegmentIterator) Close() (err error) { - return itr.err +func (itr *FileWALSegmentIterator) Close() (err error) { + if itr.closeFunc != nil { + if e := itr.closeFunc(); e != nil && err == nil { + err = e + } + } + + if e := itr.Err(); e != nil && err == nil { + err = e + } + return err } -func (itr *fileWalSegmentIterator) Next() bool { +func (itr *FileWALSegmentIterator) NotifyCh() <-chan struct{} { + return itr.notifyCh +} + +// Generation returns the generation this iterator was initialized with. +func (itr *FileWALSegmentIterator) Generation() string { + return itr.generation +} + +// Indexes returns the pending indexes. Only used for testing. +func (itr *FileWALSegmentIterator) Indexes() []int { + itr.mu.Lock() + defer itr.mu.Unlock() + return itr.indexes +} + +func (itr *FileWALSegmentIterator) Next() bool { + itr.mu.Lock() + defer itr.mu.Unlock() + // Exit if an error has already occurred. if itr.err != nil { return false } + // Read first info, if buffered. + if itr.buffered { + itr.buffered = false + return true + } + for { // Move to the next segment in cache, if available. if len(itr.infos) > 1 { @@ -448,11 +490,94 @@ func (itr *fileWalSegmentIterator) Next() bool { } } -func (itr *fileWalSegmentIterator) Err() error { return itr.err } +// SetErr sets the error on the iterator and notifies it of the change. +func (itr *FileWALSegmentIterator) SetErr(err error) { + itr.mu.Lock() + defer itr.mu.Unlock() + if itr.err == nil { + itr.err = err + } + + select { + case itr.notifyCh <- struct{}{}: + default: + } +} + +// Err returns the first error that occurs on the iterator. +func (itr *FileWALSegmentIterator) Err() error { + itr.mu.Lock() + defer itr.mu.Unlock() + return itr.err +} + +func (itr *FileWALSegmentIterator) WALSegment() WALSegmentInfo { + itr.mu.Lock() + defer itr.mu.Unlock() -func (itr *fileWalSegmentIterator) WALSegment() WALSegmentInfo { if len(itr.infos) == 0 { return WALSegmentInfo{} } return itr.infos[0] } + +// Append add an additional WAL segment to the end of the iterator. This +// function expects that info will always be later than all previous infos +// that the iterator has or has seen. +func (itr *FileWALSegmentIterator) Append(info WALSegmentInfo) error { + itr.mu.Lock() + defer itr.mu.Unlock() + + if itr.err != nil { + return itr.err + } else if itr.generation != info.Generation { + return fmt.Errorf("generation mismatch") + } + + // If the info has an index that is still waiting to be read from disk into + // the cache then simply append it to the end of the indices. + // + // If we have no pending indices, then append to the end of the infos. If + // we don't have either then just append to the infos and avoid validation. + if len(itr.indexes) > 0 { + maxIndex := itr.indexes[len(itr.indexes)-1] + + if info.Index < maxIndex { + return fmt.Errorf("appended index %q below max index %q", FormatIndex(info.Index), FormatIndex(maxIndex)) + } else if info.Index > maxIndex+1 { + return fmt.Errorf("appended index %q skips index %q", FormatIndex(info.Index), FormatIndex(maxIndex+1)) + } else if info.Index == maxIndex+1 { + itr.indexes = append(itr.indexes, info.Index) + } + // NOTE: no-op if segment index matches the current last index + + } else if len(itr.infos) > 0 { + lastInfo := itr.infos[len(itr.infos)-1] + if info.Index < lastInfo.Index { + return fmt.Errorf("appended index %q below current index %q", FormatIndex(info.Index), FormatIndex(lastInfo.Index)) + } else if info.Index > lastInfo.Index+1 { + return fmt.Errorf("appended index %q skips next index %q", FormatIndex(info.Index), FormatIndex(lastInfo.Index+1)) + } else if info.Index == lastInfo.Index+1 { + itr.indexes = append(itr.indexes, info.Index) + } else { + // If the index matches the current infos, verify its offset and append. + if info.Offset < lastInfo.Offset { + return fmt.Errorf("appended offset %s/%s before last offset %s/%s", FormatIndex(info.Index), FormatOffset(info.Offset), FormatIndex(lastInfo.Index), FormatOffset(lastInfo.Offset)) + } else if info.Offset == lastInfo.Offset { + return fmt.Errorf("duplicate offset %s/%s appended", FormatIndex(info.Index), FormatOffset(info.Offset)) + } + itr.infos = append(itr.infos, info) + } + } else { + itr.buffered = true + itr.infos = append(itr.infos, info) + } + + // Signal that a new segment is available. + select { + case itr.notifyCh <- struct{}{}: + default: + } + + return nil +} diff --git a/file_replica_client_test.go b/file_replica_client_test.go index 1a1405f..a821ff2 100644 --- a/file_replica_client_test.go +++ b/file_replica_client_test.go @@ -1,6 +1,7 @@ package litestream_test import ( + "reflect" "testing" "github.com/benbjohnson/litestream" @@ -133,3 +134,133 @@ func TestReplicaClient_WALSegmentPath(t *testing.T) { } }) } + +func TestFileWALSegmentIterator_Append(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err != nil { + t.Fatal(err) + } + + select { + case <-itr.NotifyCh(): + default: + t.Fatal("expected notification") + } + + if !itr.Next() { + t.Fatal("expected next") + } else if got, want := itr.WALSegment(), (litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); got != want { + t.Fatalf("info=%#v, want %#v", got, want) + } + }) + + t.Run("MultiOffset", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 1}); err != nil { + t.Fatal(err) + } + + select { + case <-itr.NotifyCh(): + default: + t.Fatal("expected notification") + } + + if !itr.Next() { + t.Fatal("expected next") + } else if got, want := itr.WALSegment(), (litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); got != want { + t.Fatalf("info=%#v, want %#v", got, want) + } + + if !itr.Next() { + t.Fatal("expected next") + } else if got, want := itr.WALSegment(), (litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 1}); got != want { + t.Fatalf("info=%#v, want %#v", got, want) + } + }) + + t.Run("MultiIndex", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 1, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 1, Offset: 1}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 2, Offset: 0}); err != nil { + t.Fatal(err) + } + + if got, want := itr.Indexes(), []int{1, 2}; !reflect.DeepEqual(got, want) { + t.Fatalf("indexes=%v, want %v", got, want) + } + }) + + t.Run("ErrGenerationMismatch", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0000000000000000", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err == nil || err.Error() != `generation mismatch` { + t.Fatalf("unexpected error: %s", err) + } + }) + + t.Run("ErrBelowMaxIndex", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 1, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err == nil || err.Error() != `appended index "0000000000000000" below max index "0000000000000001"` { + t.Fatalf("unexpected error: %s", err) + } + }) + + t.Run("ErrAboveMaxIndex", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 1, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 3, Offset: 0}); err == nil || err.Error() != `appended index "0000000000000003" skips index "0000000000000002"` { + t.Fatalf("unexpected error: %s", err) + } + }) + + t.Run("ErrBelowCurrentIndex", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 1, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err == nil || err.Error() != `appended index "0000000000000000" below current index "0000000000000001"` { + t.Fatalf("unexpected error: %s", err) + } + }) + + t.Run("ErrSkipsNextIndex", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 0}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 2, Offset: 0}); err == nil || err.Error() != `appended index "0000000000000002" skips next index "0000000000000001"` { + t.Fatalf("unexpected error: %s", err) + } + }) + + t.Run("ErrBelowOffset", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 5}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 4}); err == nil || err.Error() != `appended offset 0000000000000000/0000000000000004 before last offset 0000000000000000/0000000000000005` { + t.Fatalf("unexpected error: %s", err) + } + }) + + t.Run("ErrDuplicateOffset", func(t *testing.T) { + itr := litestream.NewFileWALSegmentIterator(t.TempDir(), "0123456789abcdef", nil) + if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 5}); err != nil { + t.Fatal(err) + } else if err := itr.Append(litestream.WALSegmentInfo{Generation: "0123456789abcdef", Index: 0, Offset: 5}); err == nil || err.Error() != `duplicate offset 0000000000000000/0000000000000005 appended` { + t.Fatalf("unexpected error: %s", err) + } + }) +} diff --git a/litestream.go b/litestream.go index c91c648..d8cb9e8 100644 --- a/litestream.go +++ b/litestream.go @@ -39,10 +39,12 @@ const ( // Litestream errors. var ( - ErrNoGeneration = errors.New("no generation available") - ErrNoSnapshots = errors.New("no snapshots available") - ErrNoWALSegments = errors.New("no wal segments available") - ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") + ErrDBClosed = errors.New("database closed") + ErrNoGeneration = errors.New("no generation available") + ErrGenerationChanged = errors.New("generation changed") + ErrNoSnapshots = errors.New("no snapshots available") + ErrNoWALSegments = errors.New("no wal segments available") + ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") ) var ( diff --git a/replica.go b/replica.go index 50e822e..6541fea 100644 --- a/replica.go +++ b/replica.go @@ -33,6 +33,7 @@ type Replica struct { mu sync.RWMutex pos Pos // current replicated position + itr *FileWALSegmentIterator muf sync.Mutex f *os.File // long-running file descriptor to avoid non-OFD lock issues @@ -126,6 +127,11 @@ func (r *Replica) Start(ctx context.Context) { func (r *Replica) Stop() { r.cancel() r.wg.Wait() + + if r.itr != nil { + r.itr.Close() + r.itr = nil + } } // Close will close the DB file descriptor which could release locks on @@ -155,10 +161,24 @@ func (r *Replica) Sync(ctx context.Context) (err error) { // Find current position of database. dpos := r.db.Pos() if dpos.IsZero() { - return fmt.Errorf("no generation, waiting for data") + return ErrNoGeneration } generation := dpos.Generation + // Close out iterator if the generation has changed. + if r.itr != nil && r.itr.Generation() != generation { + _ = r.itr.Close() + r.itr = nil + } + + // Ensure we obtain a WAL iterator before we snapshot so we don't miss any segments. + resetItr := r.itr == nil + if resetItr { + if r.itr, err = r.db.WALSegments(ctx, generation); err != nil { + return fmt.Errorf("wal segments: %w", err) + } + } + // Create snapshot if no snapshots exist for generation. snapshotN, err := r.snapshotN(generation) if err != nil { @@ -174,7 +194,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) { replicaSnapshotTotalGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(snapshotN)) // Determine position, if necessary. - if r.Pos().Generation != generation { + if resetItr { pos, err := r.calcPos(ctx, generation) if err != nil { return fmt.Errorf("cannot determine replica position: %s", err) @@ -196,16 +216,11 @@ func (r *Replica) Sync(ctx context.Context) (err error) { func (r *Replica) syncWAL(ctx context.Context) (err error) { pos := r.Pos() - itr, err := r.db.WALSegments(ctx, pos.Generation) - if err != nil { - return err - } - defer itr.Close() - // Group segments by index. var segments [][]WALSegmentInfo - for itr.Next() { - info := itr.WALSegment() + for r.itr.Next() { + info := r.itr.WALSegment() + if cmp, err := ComparePos(pos, info.Pos()); err != nil { return fmt.Errorf("compare pos: %w", err) } else if cmp == 1 { @@ -624,38 +639,39 @@ func (r *Replica) deleteWALSegmentsBeforeIndex(ctx context.Context, generation s // monitor runs in a separate goroutine and continuously replicates the DB. func (r *Replica) monitor(ctx context.Context) { - ticker := time.NewTicker(r.SyncInterval) - defer ticker.Stop() + timer := time.NewTimer(r.SyncInterval) + defer timer.Stop() - // Continuously check for new data to replicate. - ch := make(chan struct{}) - close(ch) - var notify <-chan struct{} = ch + for { + if err := r.Sync(ctx); ctx.Err() != nil { + return + } else if err != nil && err != ErrNoGeneration { + r.Logger.Printf("monitor error: %s", err) + } - for initial := true; ; initial = false { - // Enforce a minimum time between synchronization. - if !initial { + // Wait for a change to the WAL iterator. + if r.itr != nil { select { case <-ctx.Done(): return - case <-ticker.C: + case <-r.itr.NotifyCh(): } } - // Wait for changes to the database. + // Wait for the sync interval to collect additional changes. + timer.Reset(r.SyncInterval) select { case <-ctx.Done(): return - case <-notify: + case <-timer.C: } - // Fetch new notify channel before replicating data. - notify = r.db.WALNotify() - - // Synchronize the shadow wal into the replication directory. - if err := r.Sync(ctx); err != nil { - r.Logger.Printf("monitor error: %s", err) - continue + // Flush any additional notifications from the WAL iterator. + if r.itr != nil { + select { + case <-r.itr.NotifyCh(): + default: + } } } } diff --git a/replica_client_test.go b/replica_client_test.go index fedf9f2..83117b2 100644 --- a/replica_client_test.go +++ b/replica_client_test.go @@ -489,8 +489,6 @@ func TestFindMaxIndexByGeneration(t *testing.T) { }) } -func TestRestoreSnapshot(t *testing.T) { t.Skip("TODO") } - func TestRestore(t *testing.T) { t.Run("OK", func(t *testing.T) { testDir := filepath.Join("testdata", "restore", "ok")