Files
litestream/file_replica_client.go
Ben Johnson 3f0ec9fa9f Refactor Restore()
This commit refactors out the complexity of downloading ordered WAL
files in parallel to a type called `WALDownloader`. This makes it
easier to test the restore separately from the download.
2022-01-04 15:03:59 -07:00

459 lines
11 KiB
Go

package litestream
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"github.com/benbjohnson/litestream/internal"
)
// FileReplicaClientType is the client type for file replica clients.
const FileReplicaClientType = "file"
var _ ReplicaClient = (*FileReplicaClient)(nil)
// FileReplicaClient is a client for writing snapshots & WAL segments to disk.
type FileReplicaClient struct {
path string // destination path
// File info
FileMode os.FileMode
DirMode os.FileMode
Uid, Gid int
}
// NewFileReplicaClient returns a new instance of FileReplicaClient.
func NewFileReplicaClient(path string) *FileReplicaClient {
return &FileReplicaClient{
path: path,
FileMode: 0600,
DirMode: 0700,
}
}
// Type returns "file" as the client type.
func (c *FileReplicaClient) Type() string {
return FileReplicaClientType
}
// Path returns the destination path to replicate the database to.
func (c *FileReplicaClient) Path() string {
return c.path
}
// GenerationsDir returns the path to a generation root directory.
func (c *FileReplicaClient) GenerationsDir() (string, error) {
if c.path == "" {
return "", fmt.Errorf("file replica path required")
}
return filepath.Join(c.path, "generations"), nil
}
// GenerationDir returns the path to a generation's root directory.
func (c *FileReplicaClient) GenerationDir(generation string) (string, error) {
dir, err := c.GenerationsDir()
if err != nil {
return "", err
} else if generation == "" {
return "", fmt.Errorf("generation required")
}
return filepath.Join(dir, generation), nil
}
// SnapshotsDir returns the path to a generation's snapshot directory.
func (c *FileReplicaClient) SnapshotsDir(generation string) (string, error) {
dir, err := c.GenerationDir(generation)
if err != nil {
return "", err
}
return filepath.Join(dir, "snapshots"), nil
}
// SnapshotPath returns the path to an uncompressed snapshot file.
func (c *FileReplicaClient) SnapshotPath(generation string, index int) (string, error) {
dir, err := c.SnapshotsDir(generation)
if err != nil {
return "", err
}
return filepath.Join(dir, FormatIndex(index)+".snapshot.lz4"), nil
}
// WALDir returns the path to a generation's WAL directory
func (c *FileReplicaClient) WALDir(generation string) (string, error) {
dir, err := c.GenerationDir(generation)
if err != nil {
return "", err
}
return filepath.Join(dir, "wal"), nil
}
// WALSegmentPath returns the path to a WAL segment file.
func (c *FileReplicaClient) WALSegmentPath(generation string, index int, offset int64) (string, error) {
dir, err := c.WALDir(generation)
if err != nil {
return "", err
}
return filepath.Join(dir, FormatIndex(index), fmt.Sprintf("%08x.wal.lz4", offset)), nil
}
// Generations returns a list of available generation names.
func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error) {
root, err := c.GenerationsDir()
if err != nil {
return nil, fmt.Errorf("cannot determine generations path: %w", err)
}
fis, err := ioutil.ReadDir(root)
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
var generations []string
for _, fi := range fis {
if !IsGenerationName(fi.Name()) {
continue
} else if !fi.IsDir() {
continue
}
generations = append(generations, fi.Name())
}
return generations, nil
}
// DeleteGeneration deletes all snapshots & WAL segments within a generation.
func (c *FileReplicaClient) DeleteGeneration(ctx context.Context, generation string) error {
dir, err := c.GenerationDir(generation)
if err != nil {
return fmt.Errorf("cannot determine generation path: %w", err)
}
if err := os.RemoveAll(dir); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// Snapshots returns an iterator over all available snapshots for a generation.
func (c *FileReplicaClient) Snapshots(ctx context.Context, generation string) (SnapshotIterator, error) {
dir, err := c.SnapshotsDir(generation)
if err != nil {
return nil, err
}
f, err := os.Open(dir)
if os.IsNotExist(err) {
return NewSnapshotInfoSliceIterator(nil), nil
} else if err != nil {
return nil, err
}
defer f.Close()
fis, err := f.Readdir(-1)
if err != nil {
return nil, err
}
// Iterate over every file and convert to metadata.
infos := make([]SnapshotInfo, 0, len(fis))
for _, fi := range fis {
// Parse index from filename.
index, err := internal.ParseSnapshotPath(filepath.Base(fi.Name()))
if err != nil {
continue
}
infos = append(infos, SnapshotInfo{
Generation: generation,
Index: index,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
}
sort.Sort(SnapshotInfoSlice(infos))
return NewSnapshotInfoSliceIterator(infos), nil
}
// WriteSnapshot writes LZ4 compressed data from rd into a file on disk.
func (c *FileReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info SnapshotInfo, err error) {
filename, err := c.SnapshotPath(generation, index)
if err != nil {
return info, err
}
// Ensure parent directory exists.
if err := internal.MkdirAll(filepath.Dir(filename), c.DirMode, c.Uid, c.Gid); err != nil {
return info, err
}
// Write snapshot to temporary file next to destination path.
f, err := internal.CreateFile(filename+".tmp", c.FileMode, c.Uid, c.Gid)
if err != nil {
return info, err
}
defer f.Close()
if _, err := io.Copy(f, rd); err != nil {
return info, err
} else if err := f.Sync(); err != nil {
return info, err
} else if err := f.Close(); err != nil {
return info, err
}
// Build metadata.
fi, err := os.Stat(filename + ".tmp")
if err != nil {
return info, err
}
info = SnapshotInfo{
Generation: generation,
Index: index,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
}
// Move snapshot to final path when it has been fully written & synced to disk.
if err := os.Rename(filename+".tmp", filename); err != nil {
return info, err
}
return info, nil
}
// SnapshotReader returns a reader for snapshot data at the given generation/index.
// Returns os.ErrNotExist if no matching index is found.
func (c *FileReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
filename, err := c.SnapshotPath(generation, index)
if err != nil {
return nil, err
}
return os.Open(filename)
}
// DeleteSnapshot deletes a snapshot with the given generation & index.
func (c *FileReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
filename, err := c.SnapshotPath(generation, index)
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// WALSegments returns an iterator over all available WAL files for a generation.
func (c *FileReplicaClient) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) {
dir, err := c.WALDir(generation)
if err != nil {
return nil, err
}
f, err := os.Open(dir)
if os.IsNotExist(err) {
return NewWALSegmentInfoSliceIterator(nil), nil
} else if err != nil {
return nil, err
}
defer f.Close()
fis, err := f.Readdir(-1)
if err != nil {
return nil, err
}
// Iterate over every file and convert to metadata.
indexes := make([]int, 0, len(fis))
for _, fi := range fis {
index, err := ParseIndex(fi.Name())
if err != nil || !fi.IsDir() {
continue
}
indexes = append(indexes, index)
}
sort.Ints(indexes)
return newFileWALSegmentIterator(dir, generation, indexes), nil
}
// WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
func (c *FileReplicaClient) WriteWALSegment(ctx context.Context, pos Pos, rd io.Reader) (info WALSegmentInfo, err error) {
filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset)
if err != nil {
return info, err
}
// Ensure parent directory exists.
if err := internal.MkdirAll(filepath.Dir(filename), c.DirMode, c.Uid, c.Gid); err != nil {
return info, err
}
// Write WAL segment to temporary file next to destination path.
f, err := internal.CreateFile(filename+".tmp", c.FileMode, c.Uid, c.Gid)
if err != nil {
return info, err
}
defer f.Close()
if _, err := io.Copy(f, rd); err != nil {
return info, err
} else if err := f.Sync(); err != nil {
return info, err
} else if err := f.Close(); err != nil {
return info, err
}
// Build metadata.
fi, err := os.Stat(filename + ".tmp")
if err != nil {
return info, err
}
info = WALSegmentInfo{
Generation: pos.Generation,
Index: pos.Index,
Offset: pos.Offset,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
}
// Move WAL segment to final path when it has been written & synced to disk.
if err := os.Rename(filename+".tmp", filename); err != nil {
return info, err
}
return info, nil
}
// WALSegmentReader returns a reader for a section of WAL data at the given position.
// Returns os.ErrNotExist if no matching index/offset is found.
func (c *FileReplicaClient) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error) {
filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset)
if err != nil {
return nil, err
}
return os.Open(filename)
}
// DeleteWALSegments deletes WAL segments at the given positions.
func (c *FileReplicaClient) DeleteWALSegments(ctx context.Context, a []Pos) error {
for _, pos := range a {
filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset)
if err != nil {
return err
}
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
return err
}
}
return nil
}
type fileWalSegmentIterator struct {
dir string
generation string
indexes []int
infos []WALSegmentInfo
err error
}
func newFileWALSegmentIterator(dir, generation string, indexes []int) *fileWalSegmentIterator {
return &fileWalSegmentIterator{
dir: dir,
generation: generation,
indexes: indexes,
}
}
func (itr *fileWalSegmentIterator) Close() (err error) {
return itr.err
}
func (itr *fileWalSegmentIterator) Next() bool {
// Exit if an error has already occurred.
if itr.err != nil {
return false
}
for {
// Move to the next segment in cache, if available.
if len(itr.infos) > 1 {
itr.infos = itr.infos[1:]
return true
}
itr.infos = itr.infos[:0] // otherwise clear infos
// If no indexes remain, stop iteration.
if len(itr.indexes) == 0 {
return false
}
// Read segments into a cache for the current index.
index := itr.indexes[0]
itr.indexes = itr.indexes[1:]
f, err := os.Open(filepath.Join(itr.dir, FormatIndex(index)))
if err != nil {
itr.err = err
return false
}
defer f.Close()
fis, err := f.Readdir(-1)
if err != nil {
itr.err = err
return false
} else if err := f.Close(); err != nil {
itr.err = err
return false
}
for _, fi := range fis {
filename := filepath.Base(fi.Name())
if fi.IsDir() {
continue
}
offset, err := ParseOffset(strings.TrimSuffix(filename, ".wal.lz4"))
if err != nil {
continue
}
itr.infos = append(itr.infos, WALSegmentInfo{
Generation: itr.generation,
Index: index,
Offset: offset,
Size: fi.Size(),
CreatedAt: fi.ModTime().UTC(),
})
}
// Ensure segments are sorted within index.
sort.Sort(WALSegmentInfoSlice(itr.infos))
if len(itr.infos) > 0 {
return true
}
}
}
func (itr *fileWalSegmentIterator) Err() error { return itr.err }
func (itr *fileWalSegmentIterator) WALSegment() WALSegmentInfo {
if len(itr.infos) == 0 {
return WALSegmentInfo{}
}
return itr.infos[0]
}