Group WAL files in file replica

This commit is contained in:
Ben Johnson
2020-12-29 16:40:28 -07:00
parent ddf85f7150
commit 42a33cccf4
3 changed files with 145 additions and 76 deletions

13
db.go
View File

@@ -129,7 +129,7 @@ func (db *DB) CurrentShadowWALIndex(generation string) (int, error) {
if !strings.HasSuffix(fi.Name(), WALExt) { if !strings.HasSuffix(fi.Name(), WALExt) {
continue continue
} }
if v, err := ParseWALFilename(filepath.Base(fi.Name())); err != nil { if v, _, _, err := ParseWALPath(fi.Name()); err != nil {
continue // invalid wal filename continue // invalid wal filename
} else if v > index { } else if v > index {
index = v index = v
@@ -640,13 +640,13 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
// Parse index of current shadow WAL file. // Parse index of current shadow WAL file.
dir, base := filepath.Split(info.shadowWALPath) dir, base := filepath.Split(info.shadowWALPath)
index, err := ParseWALFilename(base) index, _, _, err := ParseWALPath(base)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
} }
// Start a new shadow WAL file with next index. // Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(dir, FormatWALFilename(index+1)) newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
if err := db.initShadowWALFile(newShadowWALPath); err != nil { if err := db.initShadowWALFile(newShadowWALPath); err != nil {
return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
} }
@@ -962,14 +962,13 @@ func (db *DB) checkpoint(info syncInfo, force bool) error {
} }
// Parse index of current shadow WAL file. // Parse index of current shadow WAL file.
dir, base := filepath.Split(info.shadowWALPath) index, _, _, err := ParseWALPath(info.shadowWALPath)
index, err := ParseWALFilename(base)
if err != nil { if err != nil {
return fmt.Errorf("cannot parse shadow wal filename: %s", base) return fmt.Errorf("cannot parse shadow wal filename: %s", info.shadowWALPath)
} }
// Start a new shadow WAL file with next index. // Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(dir, FormatWALFilename(index+1)) newShadowWALPath := filepath.Join(filepath.Dir(info.shadowWALPath), FormatWALPath(index+1))
if err := db.initShadowWALFile(newShadowWALPath); err != nil { if err := db.initShadowWALFile(newShadowWALPath); err != nil {
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
} }

View File

@@ -119,19 +119,6 @@ func readFileAt(filename string, offset, n int64) ([]byte, error) {
return buf, nil return buf, nil
} }
func ParseWALFilename(name string) (index int, err error) {
v, err := strconv.ParseInt(strings.TrimSuffix(name, WALExt), 16, 64)
if err != nil {
return 0, fmt.Errorf("invalid wal filename: %q", name)
}
return int(v), nil
}
func FormatWALFilename(index int) string {
assert(index >= 0, "wal index must be non-negative")
return fmt.Sprintf("%016x%s", index, WALExt)
}
// removeTmpFiles recursively finds and removes .tmp files. // removeTmpFiles recursively finds and removes .tmp files.
func removeTmpFiles(root string) error { func removeTmpFiles(root string) error {
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
@@ -167,6 +154,8 @@ func IsSnapshotPath(s string) bool {
// ParseSnapshotPath returns the index for the snapshot. // ParseSnapshotPath returns the index for the snapshot.
// Returns an error if the path is not a valid snapshot path. // Returns an error if the path is not a valid snapshot path.
func ParseSnapshotPath(s string) (index int, typ, ext string, err error) { func ParseSnapshotPath(s string) (index int, typ, ext string, err error) {
s = filepath.Base(s)
a := snapshotPathRegex.FindStringSubmatch(s) a := snapshotPathRegex.FindStringSubmatch(s)
if a == nil { if a == nil {
return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s) return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s)
@@ -186,6 +175,8 @@ func IsWALPath(s string) bool {
// ParseWALPath returns the index & offset for the WAL file. // ParseWALPath returns the index & offset for the WAL file.
// Returns an error if the path is not a valid snapshot path. // Returns an error if the path is not a valid snapshot path.
func ParseWALPath(s string) (index int, offset int64, ext string, err error) { func ParseWALPath(s string) (index int, offset int64, ext string, err error) {
s = filepath.Base(s)
a := walPathRegex.FindStringSubmatch(s) a := walPathRegex.FindStringSubmatch(s)
if a == nil { if a == nil {
return 0, 0, "", fmt.Errorf("invalid wal path: %s", s) return 0, 0, "", fmt.Errorf("invalid wal path: %s", s)
@@ -196,6 +187,19 @@ func ParseWALPath(s string) (index int, offset int64, ext string, err error) {
return int(i64), int64(off64), a[3], nil return int(i64), int64(off64), a[3], nil
} }
// FormatWALPath formats a WAL filename with a given index.
func FormatWALPath(index int) string {
assert(index >= 0, "wal index must be non-negative")
return fmt.Sprintf("%016x%s", index, WALExt)
}
// FormatWALPathWithOffset formats a WAL filename with a given index & offset.
func FormatWALPathWithOffset(index int, offset int64) string {
assert(index >= 0, "wal index must be non-negative")
assert(offset >= 0, "wal offset must be non-negative")
return fmt.Sprintf("%016x_%016x%s", index, offset, WALExt)
}
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`) var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`)
// isHexChar returns true if ch is a lowercase hex character. // isHexChar returns true if ch is a lowercase hex character.

View File

@@ -10,7 +10,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strconv"
"sync" "sync"
"time" "time"
) )
@@ -51,6 +51,17 @@ type Replica interface {
WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
} }
// GenerationStats represents high level stats for a single generation.
type GenerationStats struct {
// Count of snapshot & WAL files.
SnapshotN int
WALN int
// Time range for the earliest snapshot & latest WAL file update.
CreatedAt time.Time
UpdatedAt time.Time
}
var _ Replica = (*FileReplica)(nil) var _ Replica = (*FileReplica)(nil)
// FileReplica is a replica that replicates a DB to a local file path. // FileReplica is a replica that replicates a DB to a local file path.
@@ -104,9 +115,52 @@ func (r *FileReplica) WALDir(generation string) string {
return filepath.Join(r.dst, "generations", generation, "wal") return filepath.Join(r.dst, "generations", generation, "wal")
} }
// WALSubdir returns the directory used for grouping WAL files.
func (r *FileReplica) WALSubdir(generation string, index int) string {
return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x", uint64(index)&walDirMask))
}
// WALSubdirNames returns a list of all WAL subdirectory group names.
func (r *FileReplica) WALSubdirNames(generation string) ([]string, error) {
fis, err := ioutil.ReadDir(r.WALDir(generation))
if err != nil && !os.IsNotExist(err) {
return nil, err
}
var names []string
for _, fi := range fis {
if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil {
continue
}
names = append(names, fi.Name())
}
return names, nil
}
// MaxWALSubdirName returns the highest WAL subdirectory group name.
func (r *FileReplica) MaxWALSubdirName(generation string) (string, error) {
fis, err := ioutil.ReadDir(r.WALDir(generation))
if err != nil {
return "", err
}
var name string
for _, fi := range fis {
if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil {
continue
} else if name == "" || fi.Name() > name {
name = fi.Name()
}
}
if name == "" {
return "", os.ErrNotExist
}
return name, nil
}
// WALPath returns the path to a WAL file. // WALPath returns the path to a WAL file.
func (r *FileReplica) WALPath(generation string, index int) string { func (r *FileReplica) WALPath(generation string, index int) string {
return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index)) return filepath.Join(r.WALSubdir(generation, index), fmt.Sprintf("%016x.wal", index))
} }
// Generations returns a list of available generation names. // Generations returns a list of available generation names.
@@ -184,37 +238,37 @@ func (r *FileReplica) snapshotStats(generation string) (n int, min, max time.Tim
} }
func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) { func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) {
fis, err := ioutil.ReadDir(r.WALDir(generation)) names, err := r.WALSubdirNames(generation)
if os.IsNotExist(err) { if err != nil {
return n, min, max, nil
} else if err != nil {
return n, min, max, err return n, min, max, err
} }
for _, fi := range fis { for _, name := range names {
if !IsWALPath(fi.Name()) { fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name))
continue if os.IsNotExist(err) {
return n, min, max, nil
} else if err != nil {
return n, min, max, err
} }
modTime := fi.ModTime().UTC()
n++ for _, fi := range fis {
if min.IsZero() || modTime.Before(min) { if !IsWALPath(fi.Name()) {
min = modTime continue
} }
if max.IsZero() || modTime.After(max) { modTime := fi.ModTime().UTC()
max = modTime
n++
if min.IsZero() || modTime.Before(min) {
min = modTime
}
if max.IsZero() || modTime.After(max) {
max = modTime
}
} }
} }
return n, min, max, nil return n, min, max, nil
} }
type GenerationStats struct {
SnapshotN int
WALN int
CreatedAt time.Time
UpdatedAt time.Time
}
// Start starts replication for a given generation. // Start starts replication for a given generation.
func (r *FileReplica) Start(ctx context.Context) { func (r *FileReplica) Start(ctx context.Context) {
// Stop previous replication. // Stop previous replication.
@@ -305,9 +359,16 @@ func (r *FileReplica) pos() (pos Pos, err error) {
} }
pos.Generation = generation pos.Generation = generation
// Find highest WAL subdirectory group.
subdir, err := r.MaxWALSubdirName(generation)
if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at beginning of generation
} else if err != nil {
return pos, err
}
// Find the max WAL file. // Find the max WAL file.
dir := r.WALDir(generation) fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), subdir))
fis, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return pos, nil // no replicated wal, start at beginning of generation return pos, nil // no replicated wal, start at beginning of generation
} else if err != nil { } else if err != nil {
@@ -316,17 +377,10 @@ func (r *FileReplica) pos() (pos Pos, err error) {
index := -1 index := -1
for _, fi := range fis { for _, fi := range fis {
name := fi.Name() if idx, _, _, err := ParseWALPath(fi.Name()); err != nil {
name = strings.TrimSuffix(name, ".gz")
if !strings.HasSuffix(name, WALExt) {
continue
}
if v, err := ParseWALFilename(filepath.Base(name)); err != nil {
continue // invalid wal filename continue // invalid wal filename
} else if index == -1 || v > index { } else if index == -1 || idx > index {
index = v index = idx
} }
} }
if index == -1 { if index == -1 {
@@ -335,7 +389,7 @@ func (r *FileReplica) pos() (pos Pos, err error) {
pos.Index = index pos.Index = index
// Determine current offset. // Determine current offset.
fi, err := os.Stat(filepath.Join(dir, FormatWALFilename(pos.Index))) fi, err := os.Stat(r.WALPath(pos.Generation, pos.Index))
if err != nil { if err != nil {
return pos, err return pos, err
} }
@@ -446,8 +500,7 @@ func (r *FileReplica) syncNext(ctx context.Context, pos Pos) (_ Pos, err error)
// compress gzips all WAL files before the current one. // compress gzips all WAL files before the current one.
func (r *FileReplica) compress(ctx context.Context, generation string) error { func (r *FileReplica) compress(ctx context.Context, generation string) error {
dir := r.WALDir(generation) filenames, err := filepath.Glob(filepath.Join(r.WALDir(generation), "**/*.wal"))
filenames, err := filepath.Glob(filepath.Join(dir, "*.wal"))
if err != nil { if err != nil {
return err return err
} else if len(filenames) <= 1 { } else if len(filenames) <= 1 {
@@ -513,31 +566,38 @@ func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, ti
// Returns the highest index for a WAL file that occurs before timestamp. // Returns the highest index for a WAL file that occurs before timestamp.
// If timestamp is zero, returns the highest WAL index. // If timestamp is zero, returns the highest WAL index.
func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) {
fis, err := ioutil.ReadDir(r.WALDir(generation)) names, err := r.WALSubdirNames(generation)
if os.IsNotExist(err) { if err != nil {
return 0, nil
} else if err != nil {
return 0, err return 0, err
} }
index := -1 // TODO: Optimize to only read the last group if no timestamp specified.
for _, fi := range fis { // TODO: Perform binary search to find correct timestamp.
// Read index from snapshot filename.
idx, _, _, err := ParseWALPath(fi.Name()) var index int
if err != nil { for _, name := range names {
continue // not a snapshot, skip fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name))
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { if os.IsNotExist(err) {
continue // after timestamp, skip return 0, nil
} else if idx < index { } else if err != nil {
continue // earlier index, skip return 0, err
} }
index = idx for _, fi := range fis {
// Read index from snapshot filename.
idx, _, _, err := ParseWALPath(fi.Name())
if err != nil {
continue // not a snapshot, skip
} else if !timestamp.IsZero() && fi.ModTime().After(timestamp) {
continue // after timestamp, skip
} else if idx < index {
continue // earlier index, skip
}
index = idx
}
} }
if index == -1 {
return 0, nil
}
return index, nil return index, nil
} }
@@ -638,3 +698,9 @@ func compressFile(src, dst string) error {
// Move compressed file to final location. // Move compressed file to final location.
return os.Rename(dst+".tmp", dst) return os.Rename(dst+".tmp", dst)
} }
// walDirMask is a mask used to group 64K wal files into a directory.
const (
walDirFileN = 0x10000
walDirMask = uint64(0xFFFFFFFFFFFFFFFF ^ (walDirFileN - 1))
)