diff --git a/db.go b/db.go index df5bce5..20bbefb 100644 --- a/db.go +++ b/db.go @@ -129,7 +129,7 @@ func (db *DB) CurrentShadowWALIndex(generation string) (int, error) { if !strings.HasSuffix(fi.Name(), WALExt) { continue } - if v, err := ParseWALFilename(filepath.Base(fi.Name())); err != nil { + if v, _, _, err := ParseWALPath(fi.Name()); err != nil { continue // invalid wal filename } else if v > index { index = v @@ -640,13 +640,13 @@ func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) { // Parse index of current shadow WAL file. dir, base := filepath.Split(info.shadowWALPath) - index, err := ParseWALFilename(base) + index, _, _, err := ParseWALPath(base) if err != nil { return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) } // Start a new shadow WAL file with next index. - newShadowWALPath := filepath.Join(dir, FormatWALFilename(index+1)) + newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1)) if err := db.initShadowWALFile(newShadowWALPath); err != nil { return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } @@ -962,14 +962,13 @@ func (db *DB) checkpoint(info syncInfo, force bool) error { } // Parse index of current shadow WAL file. - dir, base := filepath.Split(info.shadowWALPath) - index, err := ParseWALFilename(base) + index, _, _, err := ParseWALPath(info.shadowWALPath) if err != nil { - return fmt.Errorf("cannot parse shadow wal filename: %s", base) + return fmt.Errorf("cannot parse shadow wal filename: %s", info.shadowWALPath) } // Start a new shadow WAL file with next index. - newShadowWALPath := filepath.Join(dir, FormatWALFilename(index+1)) + newShadowWALPath := filepath.Join(filepath.Dir(info.shadowWALPath), FormatWALPath(index+1)) if err := db.initShadowWALFile(newShadowWALPath); err != nil { return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } diff --git a/litestream.go b/litestream.go index c5fae8b..d41a7d1 100644 --- a/litestream.go +++ b/litestream.go @@ -119,19 +119,6 @@ func readFileAt(filename string, offset, n int64) ([]byte, error) { return buf, nil } -func ParseWALFilename(name string) (index int, err error) { - v, err := strconv.ParseInt(strings.TrimSuffix(name, WALExt), 16, 64) - if err != nil { - return 0, fmt.Errorf("invalid wal filename: %q", name) - } - return int(v), nil -} - -func FormatWALFilename(index int) string { - assert(index >= 0, "wal index must be non-negative") - return fmt.Sprintf("%016x%s", index, WALExt) -} - // removeTmpFiles recursively finds and removes .tmp files. func removeTmpFiles(root string) error { return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { @@ -167,6 +154,8 @@ func IsSnapshotPath(s string) bool { // ParseSnapshotPath returns the index for the snapshot. // Returns an error if the path is not a valid snapshot path. func ParseSnapshotPath(s string) (index int, typ, ext string, err error) { + s = filepath.Base(s) + a := snapshotPathRegex.FindStringSubmatch(s) if a == nil { return 0, "", "", fmt.Errorf("invalid snapshot path: %s", s) @@ -186,6 +175,8 @@ func IsWALPath(s string) bool { // ParseWALPath returns the index & offset for the WAL file. // Returns an error if the path is not a valid snapshot path. func ParseWALPath(s string) (index int, offset int64, ext string, err error) { + s = filepath.Base(s) + a := walPathRegex.FindStringSubmatch(s) if a == nil { return 0, 0, "", fmt.Errorf("invalid wal path: %s", s) @@ -196,6 +187,19 @@ func ParseWALPath(s string) (index int, offset int64, ext string, err error) { return int(i64), int64(off64), a[3], nil } +// FormatWALPath formats a WAL filename with a given index. +func FormatWALPath(index int) string { + assert(index >= 0, "wal index must be non-negative") + return fmt.Sprintf("%016x%s", index, WALExt) +} + +// FormatWALPathWithOffset formats a WAL filename with a given index & offset. +func FormatWALPathWithOffset(index int, offset int64) string { + assert(index >= 0, "wal index must be non-negative") + assert(offset >= 0, "wal offset must be non-negative") + return fmt.Sprintf("%016x_%016x%s", index, offset, WALExt) +} + var walPathRegex = regexp.MustCompile(`^([0-9a-f]{16})(?:_([0-9a-f]{16}))?(.wal(?:.gz)?)$`) // isHexChar returns true if ch is a lowercase hex character. diff --git a/replicator.go b/replicator.go index 86419d8..acce54c 100644 --- a/replicator.go +++ b/replicator.go @@ -10,7 +10,7 @@ import ( "os" "path/filepath" "sort" - "strings" + "strconv" "sync" "time" ) @@ -51,6 +51,17 @@ type Replica interface { WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) } +// GenerationStats represents high level stats for a single generation. +type GenerationStats struct { + // Count of snapshot & WAL files. + SnapshotN int + WALN int + + // Time range for the earliest snapshot & latest WAL file update. + CreatedAt time.Time + UpdatedAt time.Time +} + var _ Replica = (*FileReplica)(nil) // FileReplica is a replica that replicates a DB to a local file path. @@ -104,9 +115,52 @@ func (r *FileReplica) WALDir(generation string) string { return filepath.Join(r.dst, "generations", generation, "wal") } +// WALSubdir returns the directory used for grouping WAL files. +func (r *FileReplica) WALSubdir(generation string, index int) string { + return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x", uint64(index)&walDirMask)) +} + +// WALSubdirNames returns a list of all WAL subdirectory group names. +func (r *FileReplica) WALSubdirNames(generation string) ([]string, error) { + fis, err := ioutil.ReadDir(r.WALDir(generation)) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + var names []string + for _, fi := range fis { + if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil { + continue + } + names = append(names, fi.Name()) + } + return names, nil +} + +// MaxWALSubdirName returns the highest WAL subdirectory group name. +func (r *FileReplica) MaxWALSubdirName(generation string) (string, error) { + fis, err := ioutil.ReadDir(r.WALDir(generation)) + if err != nil { + return "", err + } + + var name string + for _, fi := range fis { + if _, err := strconv.ParseUint(fi.Name(), 16, 64); err != nil { + continue + } else if name == "" || fi.Name() > name { + name = fi.Name() + } + } + if name == "" { + return "", os.ErrNotExist + } + return name, nil +} + // WALPath returns the path to a WAL file. func (r *FileReplica) WALPath(generation string, index int) string { - return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index)) + return filepath.Join(r.WALSubdir(generation, index), fmt.Sprintf("%016x.wal", index)) } // Generations returns a list of available generation names. @@ -184,37 +238,37 @@ func (r *FileReplica) snapshotStats(generation string) (n int, min, max time.Tim } func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) { - fis, err := ioutil.ReadDir(r.WALDir(generation)) - if os.IsNotExist(err) { - return n, min, max, nil - } else if err != nil { + names, err := r.WALSubdirNames(generation) + if err != nil { return n, min, max, err } - for _, fi := range fis { - if !IsWALPath(fi.Name()) { - continue + for _, name := range names { + fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name)) + if os.IsNotExist(err) { + return n, min, max, nil + } else if err != nil { + return n, min, max, err } - modTime := fi.ModTime().UTC() - n++ - if min.IsZero() || modTime.Before(min) { - min = modTime - } - if max.IsZero() || modTime.After(max) { - max = modTime + for _, fi := range fis { + if !IsWALPath(fi.Name()) { + continue + } + modTime := fi.ModTime().UTC() + + n++ + if min.IsZero() || modTime.Before(min) { + min = modTime + } + if max.IsZero() || modTime.After(max) { + max = modTime + } } } return n, min, max, nil } -type GenerationStats struct { - SnapshotN int - WALN int - CreatedAt time.Time - UpdatedAt time.Time -} - // Start starts replication for a given generation. func (r *FileReplica) Start(ctx context.Context) { // Stop previous replication. @@ -305,9 +359,16 @@ func (r *FileReplica) pos() (pos Pos, err error) { } pos.Generation = generation + // Find highest WAL subdirectory group. + subdir, err := r.MaxWALSubdirName(generation) + if os.IsNotExist(err) { + return pos, nil // no replicated wal, start at beginning of generation + } else if err != nil { + return pos, err + } + // Find the max WAL file. - dir := r.WALDir(generation) - fis, err := ioutil.ReadDir(dir) + fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), subdir)) if os.IsNotExist(err) { return pos, nil // no replicated wal, start at beginning of generation } else if err != nil { @@ -316,17 +377,10 @@ func (r *FileReplica) pos() (pos Pos, err error) { index := -1 for _, fi := range fis { - name := fi.Name() - name = strings.TrimSuffix(name, ".gz") - - if !strings.HasSuffix(name, WALExt) { - continue - } - - if v, err := ParseWALFilename(filepath.Base(name)); err != nil { + if idx, _, _, err := ParseWALPath(fi.Name()); err != nil { continue // invalid wal filename - } else if index == -1 || v > index { - index = v + } else if index == -1 || idx > index { + index = idx } } if index == -1 { @@ -335,7 +389,7 @@ func (r *FileReplica) pos() (pos Pos, err error) { pos.Index = index // Determine current offset. - fi, err := os.Stat(filepath.Join(dir, FormatWALFilename(pos.Index))) + fi, err := os.Stat(r.WALPath(pos.Generation, pos.Index)) if err != nil { return pos, err } @@ -446,8 +500,7 @@ func (r *FileReplica) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) // compress gzips all WAL files before the current one. func (r *FileReplica) compress(ctx context.Context, generation string) error { - dir := r.WALDir(generation) - filenames, err := filepath.Glob(filepath.Join(dir, "*.wal")) + filenames, err := filepath.Glob(filepath.Join(r.WALDir(generation), "**/*.wal")) if err != nil { return err } else if len(filenames) <= 1 { @@ -513,31 +566,38 @@ func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, ti // Returns the highest index for a WAL file that occurs before timestamp. // If timestamp is zero, returns the highest WAL index. func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { - fis, err := ioutil.ReadDir(r.WALDir(generation)) - if os.IsNotExist(err) { - return 0, nil - } else if err != nil { + names, err := r.WALSubdirNames(generation) + if err != nil { return 0, err } - index := -1 - for _, fi := range fis { - // Read index from snapshot filename. - idx, _, _, err := ParseWALPath(fi.Name()) - if err != nil { - continue // not a snapshot, skip - } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { - continue // after timestamp, skip - } else if idx < index { - continue // earlier index, skip + // TODO: Optimize to only read the last group if no timestamp specified. + // TODO: Perform binary search to find correct timestamp. + + var index int + for _, name := range names { + fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), name)) + if os.IsNotExist(err) { + return 0, nil + } else if err != nil { + return 0, err } - index = idx + for _, fi := range fis { + // Read index from snapshot filename. + idx, _, _, err := ParseWALPath(fi.Name()) + if err != nil { + continue // not a snapshot, skip + } else if !timestamp.IsZero() && fi.ModTime().After(timestamp) { + continue // after timestamp, skip + } else if idx < index { + continue // earlier index, skip + } + + index = idx + } } - if index == -1 { - return 0, nil - } return index, nil } @@ -638,3 +698,9 @@ func compressFile(src, dst string) error { // Move compressed file to final location. return os.Rename(dst+".tmp", dst) } + +// walDirMask is a mask used to group 64K wal files into a directory. +const ( + walDirFileN = 0x10000 + walDirMask = uint64(0xFFFFFFFFFFFFFFFF ^ (walDirFileN - 1)) +)