diff --git a/db.go b/db.go index 4362692..f042110 100644 --- a/db.go +++ b/db.go @@ -22,7 +22,7 @@ import ( // Default DB settings. const ( DefaultMonitorInterval = 1 * time.Second - DefaultCheckpointInterval = 5 * time.Second // 1 * time.Minute + DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 ) @@ -108,10 +108,15 @@ func (db *DB) GenerationPath(generation string) string { return filepath.Join(db.MetaPath(), "generations", generation) } +// ShadowWALDir returns the path of the shadow wal directory. +func (db *DB) ShadowWALDir(generation string) string { + return filepath.Join(db.GenerationPath(generation), "wal") +} + // ShadowWALPath returns the path of a single shadow WAL file. func (db *DB) ShadowWALPath(generation string, index int) string { assert(index >= 0, "shadow wal index cannot be negative") - return filepath.Join(db.GenerationPath(generation), "wal", fmt.Sprintf("%016x", index)+WALExt) + return filepath.Join(db.ShadowWALDir(generation), FormatWALPath(index)) } // CurrentShadowWALPath returns the path to the last shadow WAL in a generation. @@ -147,6 +152,30 @@ func (db *DB) CurrentShadowWALIndex(generation string) (int, error) { return index, nil } +// Pos returns the current position of the database. +func (db *DB) Pos() (Pos, error) { + generation, err := db.CurrentGeneration() + if err != nil { + return Pos{}, err + } else if generation == "" { + return Pos{}, nil + } + + index, err := db.CurrentShadowWALIndex(generation) + if err != nil { + return Pos{}, err + } + + fi, err := os.Stat(db.ShadowWALPath(generation, index)) + if os.IsNotExist(err) { + return Pos{Generation: generation, Index: index}, nil + } else if err != nil { + return Pos{}, err + } + + return Pos{Generation: generation, Index: index, Offset: fi.Size()}, nil +} + // Notify returns a channel that closes when the shadow WAL changes. func (db *DB) Notify() <-chan struct{} { db.mu.RLock() @@ -292,8 +321,16 @@ func (db *DB) Init() (err error) { return nil } -// clean removes old generations. +// clean removes old generations & WAL files. func (db *DB) clean() error { + if err := db.cleanGenerations(); err != nil { + return err + } + return db.cleanWAL() +} + +// cleanGenerations removes old generations. +func (db *DB) cleanGenerations() error { generation, err := db.CurrentGeneration() if err != nil { return err @@ -320,6 +357,50 @@ func (db *DB) clean() error { return nil } +// cleanWAL removes WAL files that have been replicated. +func (db *DB) cleanWAL() error { + generation, err := db.CurrentGeneration() + if err != nil { + return err + } + + // Determine lowest index that's been replicated to all replicas. + min := -1 + for _, r := range db.Replicas { + pos := r.Pos() + if pos.Generation != generation { + pos = Pos{} // different generation, reset index to zero + } + if min == -1 || pos.Index < min { + min = pos.Index + } + } + + // Skip if our lowest index is too small. + if min <= 0 { + return nil + } + min-- // Keep an extra WAL file. + + // Remove all WAL files for the generation before the lowest index. + dir := db.ShadowWALDir(generation) + fis, err := ioutil.ReadDir(dir) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + for _, fi := range fis { + if idx, _, _, err := ParseWALPath(fi.Name()); err != nil || idx >= min { + continue + } + if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil { + return err + } + } + return nil +} + // SoftClose closes everything but the underlying db connection. This method // is available because the binary needs to avoid closing the database on exit // to prevent autocheckpointing. @@ -528,6 +609,11 @@ func (db *DB) Sync() (err error) { } } + // Clean up any old files. + if err := db.clean(); err != nil { + return fmt.Errorf("cannot clean: %w", err) + } + // Notify replicas of WAL changes. if changed { close(db.notify) diff --git a/replicator.go b/replicator.go index acce54c..99a2174 100644 --- a/replicator.go +++ b/replicator.go @@ -36,6 +36,9 @@ type Replica interface { // snapshot & WAL files as well as the time range covered. GenerationStats(ctx context.Context, generation string) (GenerationStats, error) + // Returns the last replication position. + Pos() Pos + // Returns the highest index for a snapshot within a generation that occurs // before timestamp. If timestamp is zero, returns the latest snapshot. SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) @@ -70,9 +73,10 @@ type FileReplica struct { name string // replica name, optional dst string // destination path - // mu sync.RWMutex - wg sync.WaitGroup + mu sync.RWMutex + pos Pos // last position + wg sync.WaitGroup ctx context.Context cancel func() } @@ -100,6 +104,13 @@ func (r *FileReplica) Type() string { return "file" } +// Pos returns the last successfully replicated position. +func (r *FileReplica) Pos() Pos { + r.mu.RLock() + defer r.mu.RUnlock() + return r.pos +} + // SnapshotDir returns the path to a generation's snapshot directory. func (r *FileReplica) SnapshotDir(generation string) string { return filepath.Join(r.dst, "generations", generation, "snapshots") @@ -110,6 +121,27 @@ func (r *FileReplica) SnapshotPath(generation string, index int) string { return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index)) } +// MaxSnapshotIndex returns the highest index for the snapshots. +func (r *FileReplica) MaxSnapshotIndex(generation string) (int, error) { + fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) + if err != nil { + return 0, err + } + + index := -1 + for _, fi := range fis { + if idx, _, _, err := ParseSnapshotPath(fi.Name()); err != nil { + continue + } else if index == -1 || idx > index { + index = idx + } + } + if index == -1 { + return 0, fmt.Errorf("no snapshots found") + } + return index, nil +} + // WALDir returns the path to a generation's WAL directory func (r *FileReplica) WALDir(generation string) string { return filepath.Join(r.dst, "generations", generation, "wal") @@ -300,8 +332,6 @@ func (r *FileReplica) monitor(ctx context.Context) { close(ch) var notify <-chan struct{} = ch - var pos Pos - var err error for { select { case <-ctx.Done(): @@ -312,67 +342,38 @@ func (r *FileReplica) monitor(ctx context.Context) { // Fetch new notify channel before replicating data. notify = r.db.Notify() - // Determine position, if necessary. - if pos.IsZero() { - if pos, err = r.pos(); err != nil { - log.Printf("%s(%s): cannot determine position: %s", r.db.Path(), r.Name(), err) - continue - } else if pos.IsZero() { - log.Printf("%s(%s): no generation, waiting for data", r.db.Path(), r.Name()) - continue - } - } - - // If we have no replicated WALs, start from last index in shadow WAL. - if pos.Index == 0 && pos.Offset == 0 { - if pos.Index, err = r.db.CurrentShadowWALIndex(pos.Generation); err != nil { - log.Printf("%s(%s): cannot determine latest shadow wal index: %s", r.db.Path(), r.Name(), err) - continue - } - } - // Synchronize the shadow wal into the replication directory. - if pos, err = r.sync(ctx, pos); err != nil { + if err := r.Sync(ctx); err != nil { log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err) continue } - - // Gzip any old WAL files. - if pos.Generation != "" { - if err := r.compress(ctx, pos.Generation); err != nil { - log.Printf("%s(%s): compress error: %s", r.db.Path(), r.Name(), err) - continue - } - } } } -// pos returns the position for the replica for the current generation. +// calcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. -func (r *FileReplica) pos() (pos Pos, err error) { - // Find the current generation from the DB. Return zero pos if no generation. - generation, err := r.db.CurrentGeneration() - if err != nil { - return pos, err - } else if generation == "" { - return pos, nil // empty position - } +func (r *FileReplica) calcPos(generation string) (pos Pos, err error) { pos.Generation = generation + // Find maximum snapshot index. + if pos.Index, err = r.MaxSnapshotIndex(generation); err != nil { + return Pos{}, err + } + // Find highest WAL subdirectory group. subdir, err := r.MaxWALSubdirName(generation) if os.IsNotExist(err) { - return pos, nil // no replicated wal, start at beginning of generation + return pos, nil // no replicated wal, start at snapshot index } else if err != nil { - return pos, err + return Pos{}, err } - // Find the max WAL file. + // Find the max WAL file within WAL group. fis, err := ioutil.ReadDir(filepath.Join(r.WALDir(generation), subdir)) if os.IsNotExist(err) { - return pos, nil // no replicated wal, start at beginning of generation + return pos, nil // no replicated wal, start at snapshot index. } else if err != nil { - return pos, err + return Pos{}, err } index := -1 @@ -384,14 +385,14 @@ func (r *FileReplica) pos() (pos Pos, err error) { } } if index == -1 { - return pos, nil // wal directory exists but no wal files, return beginning pos + return pos, nil // wal directory exists but no wal files, return snapshot position } pos.Index = index // Determine current offset. fi, err := os.Stat(r.WALPath(pos.Generation, pos.Index)) if err != nil { - return pos, err + return Pos{}, err } pos.Offset = fi.Size() @@ -441,61 +442,95 @@ func (r *FileReplica) snapshotN(generation string) (int, error) { return n, nil } -func (r *FileReplica) sync(ctx context.Context, pos Pos) (_ Pos, err error) { +func (r *FileReplica) Sync(ctx context.Context) (err error) { + // Find current position of database. + dpos, err := r.db.Pos() + if err != nil { + return fmt.Errorf("cannot determine current generation: %w", err) + } else if dpos.IsZero() { + return fmt.Errorf("no generation, waiting for data") + } + generation := dpos.Generation + + // Create snapshot if no snapshots exist for generation. + if n, err := r.snapshotN(generation); err != nil { + return err + } else if n == 0 { + if err := r.snapshot(ctx, generation, dpos.Index); err != nil { + return err + } + } + + // Determine position, if necessary. + if r.Pos().IsZero() { + pos, err := r.calcPos(generation) + if err != nil { + return fmt.Errorf("cannot determine replica position: %s", r.db.Path(), r.Name(), err) + } + + r.mu.Lock() + r.pos = pos + r.mu.Unlock() + } + // Read all WAL files since the last position. for { - if pos, err = r.syncNext(ctx, pos); err == io.EOF { - return pos, nil + if err = r.syncWAL(ctx); err == io.EOF { + break } else if err != nil { - return pos, err + return err } } + + // Gzip any old WAL files. + if generation != "" { + if err := r.compress(ctx, generation); err != nil { + return fmt.Errorf("cannot compress: %s", err) + } + } + + return nil } -func (r *FileReplica) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) { - rd, err := r.db.ShadowWALReader(pos) +func (r *FileReplica) syncWAL(ctx context.Context) (err error) { + rd, err := r.db.ShadowWALReader(r.Pos()) if err == io.EOF { - return pos, err + return err } else if err != nil { - return pos, fmt.Errorf("wal reader: %w", err) + return fmt.Errorf("wal reader: %w", err) } defer rd.Close() - // Create snapshot if no snapshots exist. - if n, err := r.snapshotN(rd.Pos().Generation); err != nil { - return pos, err - } else if n == 0 { - if err := r.snapshot(ctx, rd.Pos().Generation, rd.Pos().Index); err != nil { - return pos, err - } - } - // Ensure parent directory exists for WAL file. filename := r.WALPath(rd.Pos().Generation, rd.Pos().Index) if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil { - return pos, err + return err } // Create a temporary file to write into so we don't have partial writes. w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600) if err != nil { - return pos, err + return err } defer w.Close() // Seek, copy & sync WAL contents. if _, err := w.Seek(rd.Pos().Offset, io.SeekStart); err != nil { - return pos, err + return err } else if _, err := io.Copy(w, rd); err != nil { - return pos, err + return err } else if err := w.Sync(); err != nil { - return pos, err + return err } else if err := w.Close(); err != nil { - return pos, err + return err } - // Return ending position of the reader. - return rd.Pos(), nil + // Save last replicated position. + r.mu.Lock() + r.pos = rd.Pos() + r.mu.Unlock() + + return nil } // compress gzips all WAL files before the current one.