From 41448ceb897bb52428493bb3f8b68082baf503d0 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 29 Dec 2020 12:49:23 -0700 Subject: [PATCH] Rename replicator to replica --- cmd/litestream/generations.go | 6 +-- cmd/litestream/main.go | 26 ++++++------ db.go | 34 ++++++++-------- replicator.go | 77 +++++++++++++++++------------------ 4 files changed, 71 insertions(+), 72 deletions(-) diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index c900536..7085e21 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -70,15 +70,15 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) return err } - // Iterate over each replicator in the database. - for _, r := range db.Replicators { + // Iterate over each replica in the database. + for _, r := range db.Replicas { generations, err := r.Generations(ctx) if err != nil { log.Printf("%s: cannot list generations", r.Name(), err) continue } - // Iterate over each generation for the replicator. + // Iterate over each generation for the replica. for _, generation := range generations { stats, err := r.GenerationStats(ctx, generation) if err != nil { diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index b729ed5..621f971 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -134,8 +134,8 @@ type DBConfig struct { type ReplicaConfig struct { Type string `yaml:"type"` // "file", "s3" - Name string `yaml:"name"` // name of replicator, optional. - Path string `yaml:"path"` // used for file replicators + Name string `yaml:"name"` // name of replica, optional. + Path string `yaml:"path"` // used for file replicas } func registerConfigFlag(fs *flag.FlagSet, p *string) { @@ -147,32 +147,32 @@ func newDBFromConfig(config *DBConfig) (*litestream.DB, error) { // Initialize database with given path. db := litestream.NewDB(config.Path) - // Instantiate and attach replicators. + // Instantiate and attach replicas. for _, rconfig := range config.Replicas { - r, err := newReplicatorFromConfig(db, rconfig) + r, err := newReplicaFromConfig(db, rconfig) if err != nil { return nil, err } - db.Replicators = append(db.Replicators, r) + db.Replicas = append(db.Replicas, r) } return db, nil } -// newReplicatorFromConfig instantiates a replicator for a DB based on a config. -func newReplicatorFromConfig(db *litestream.DB, config *ReplicaConfig) (litestream.Replicator, error) { +// newReplicaFromConfig instantiates a replica for a DB based on a config. +func newReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (litestream.Replica, error) { switch config.Type { case "", "file": - return newFileReplicatorFromConfig(db, config) + return newFileReplicaFromConfig(db, config) default: - return nil, fmt.Errorf("unknown replicator type in config: %q", config.Type) + return nil, fmt.Errorf("unknown replica type in config: %q", config.Type) } } -// newFileReplicatorFromConfig returns a new instance of FileReplicator build from config. -func newFileReplicatorFromConfig(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplicator, error) { +// newFileReplicaFromConfig returns a new instance of FileReplica build from config. +func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplica, error) { if config.Path == "" { - return nil, fmt.Errorf("file replicator path require for db %q", db.Path()) + return nil, fmt.Errorf("file replica path require for db %q", db.Path()) } - return litestream.NewFileReplicator(db, config.Name, config.Path), nil + return litestream.NewFileReplica(db, config.Name, config.Path), nil } diff --git a/db.go b/db.go index 42f17d1..7974949 100644 --- a/db.go +++ b/db.go @@ -52,9 +52,9 @@ type DB struct { // unbounded if there are always read transactions occurring. MaxCheckpointPageN int - // List of replicators for the database. + // List of replicas for the database. // Must be set before calling Open(). - Replicators []Replicator + Replicas []Replica // Frequency at which to perform db sync. MonitorInterval time.Duration @@ -154,11 +154,11 @@ func (db *DB) PageSize() int { } func (db *DB) Open() (err error) { - // Validate that all replicator names are unique. + // Validate that all replica names are unique. m := make(map[string]struct{}) - for _, r := range db.Replicators { + for _, r := range db.Replicas { if _, ok := m[r.Name()]; ok { - return fmt.Errorf("duplicate replicator name: %q", r.Name()) + return fmt.Errorf("duplicate replica name: %q", r.Name()) } m[r.Name()] = struct{}{} } @@ -276,7 +276,7 @@ func (db *DB) Init() (err error) { } // Start replication. - for _, r := range db.Replicators { + for _, r := range db.Replicas { r.Start(db.ctx) } @@ -318,8 +318,8 @@ func (db *DB) SoftClose() (err error) { db.cancel() db.wg.Wait() - // Ensure replicators all stop replicating. - for _, r := range db.Replicators { + // Ensure replicas all stop replicating. + for _, r := range db.Replicas { r.Stop() } @@ -387,7 +387,7 @@ func (db *DB) CurrentGeneration() (string, error) { } // createGeneration starts a new generation by creating the generation -// directory, snapshotting to each replicator, and updating the current +// directory, snapshotting to each replica, and updating the current // generation name. func (db *DB) createGeneration() (string, error) { // Generate random generation hex name. @@ -516,7 +516,7 @@ func (db *DB) Sync() (err error) { } } - // Notify replicators of WAL changes. + // Notify replicas of WAL changes. if changed { close(db.notify) db.notify = make(chan struct{}) @@ -1084,14 +1084,14 @@ func (db *DB) Restore(ctx context.Context, opt RestoreOptions) error { return nil } -func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replicator, string, error) { +func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) { var target struct { - replicator Replicator + replica Replica generation string stats GenerationStats } - for _, r := range db.Replicators { + for _, r := range db.Replicas { // Skip replica if it does not match filter. if opt.ReplicaName != "" && r.Name() != opt.ReplicaName { continue @@ -1127,7 +1127,7 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log continue } - target.replicator = r + target.replica = r target.generation = generation target.stats = stats } @@ -1138,11 +1138,11 @@ func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log return nil, "", fmt.Errorf("no matching backups found") } - return target.replicator, target.generation, nil + return target.replica, target.generation, nil } // restoreSnapshot copies a snapshot from the replica to a file. -func (db *DB) restoreSnapshot(ctx context.Context, r Replicator, generation string, index int, filename string) error { +func (db *DB) restoreSnapshot(ctx context.Context, r Replica, generation string, index int, filename string) error { if err := os.MkdirAll(filepath.Dir(filename), 0700); err != nil { return err } @@ -1170,7 +1170,7 @@ func (db *DB) restoreSnapshot(ctx context.Context, r Replicator, generation stri } // restoreWAL copies a WAL file from the replica to the local WAL and forces checkpoint. -func (db *DB) restoreWAL(ctx context.Context, r Replicator, generation string, index int, dbPath string) error { +func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, index int, dbPath string) error { // Open handle to destination WAL path. f, err := os.Create(dbPath + "-wal") if err != nil { diff --git a/replicator.go b/replicator.go index d10e236..c8334b8 100644 --- a/replicator.go +++ b/replicator.go @@ -15,13 +15,12 @@ import ( "time" ) -// Replicator represents a method for replicating the snapshot & WAL data to -// a remote destination. -type Replicator interface { - // The name of the replicator. Defaults to type if no name specified. +// Replica represents a remote destination to replicate the database & WAL. +type Replica interface { + // The name of the replica. Defaults to type if no name specified. Name() string - // String identifier for the type of replicator ("file", "s3", etc). + // String identifier for the type of replica ("file", "s3", etc). Type() string // Starts replicating in a background goroutine. @@ -30,7 +29,7 @@ type Replicator interface { // Stops all replication processing. Blocks until processing stopped. Stop() - // Returns a list of generation names for the replicator. + // Returns a list of generation names for the replica. Generations(ctx context.Context) ([]string, error) // Returns basic information about a generation including the number of @@ -52,12 +51,12 @@ type Replicator interface { WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) } -var _ Replicator = (*FileReplicator)(nil) +var _ Replica = (*FileReplica)(nil) -// FileReplicator is a replicator that replicates a DB to a local file path. -type FileReplicator struct { +// FileReplica is a replica that replicates a DB to a local file path. +type FileReplica struct { db *DB // source database - name string // replicator name, optional + name string // replica name, optional dst string // destination path // mu sync.RWMutex @@ -67,9 +66,9 @@ type FileReplicator struct { cancel func() } -// NewFileReplicator returns a new instance of FileReplicator. -func NewFileReplicator(db *DB, name, dst string) *FileReplicator { - return &FileReplicator{ +// NewFileReplica returns a new instance of FileReplica. +func NewFileReplica(db *DB, name, dst string) *FileReplica { + return &FileReplica{ db: db, name: name, dst: dst, @@ -77,41 +76,41 @@ func NewFileReplicator(db *DB, name, dst string) *FileReplicator { } } -// Name returns the name of the replicator. Returns the type if no name set. -func (r *FileReplicator) Name() string { +// Name returns the name of the replica. Returns the type if no name set. +func (r *FileReplica) Name() string { if r.name != "" { return r.name } return r.Type() } -// Type returns the type of replicator. -func (r *FileReplicator) Type() string { +// Type returns the type of replica. +func (r *FileReplica) Type() string { return "file" } // SnapshotDir returns the path to a generation's snapshot directory. -func (r *FileReplicator) SnapshotDir(generation string) string { +func (r *FileReplica) SnapshotDir(generation string) string { return filepath.Join(r.dst, "generations", generation, "snapshots") } // SnapshotPath returns the path to a snapshot file. -func (r *FileReplicator) SnapshotPath(generation string, index int) string { +func (r *FileReplica) SnapshotPath(generation string, index int) string { return filepath.Join(r.SnapshotDir(generation), fmt.Sprintf("%016x.snapshot.gz", index)) } // WALDir returns the path to a generation's WAL directory -func (r *FileReplicator) WALDir(generation string) string { +func (r *FileReplica) WALDir(generation string) string { return filepath.Join(r.dst, "generations", generation, "wal") } // WALPath returns the path to a WAL file. -func (r *FileReplicator) WALPath(generation string, index int) string { +func (r *FileReplica) WALPath(generation string, index int) string { return filepath.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index)) } // Generations returns a list of available generation names. -func (r *FileReplicator) Generations(ctx context.Context) ([]string, error) { +func (r *FileReplica) Generations(ctx context.Context) ([]string, error) { fis, err := ioutil.ReadDir(filepath.Join(r.dst, "generations")) if os.IsNotExist(err) { return nil, nil @@ -132,7 +131,7 @@ func (r *FileReplicator) Generations(ctx context.Context) ([]string, error) { } // GenerationStats returns stats for a generation. -func (r *FileReplicator) GenerationStats(ctx context.Context, generation string) (stats GenerationStats, err error) { +func (r *FileReplica) GenerationStats(ctx context.Context, generation string) (stats GenerationStats, err error) { // Determine stats for all snapshots. n, min, max, err := r.snapshotStats(generation) if err != nil { @@ -159,7 +158,7 @@ func (r *FileReplicator) GenerationStats(ctx context.Context, generation string) return stats, nil } -func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time.Time, err error) { +func (r *FileReplica) snapshotStats(generation string) (n int, min, max time.Time, err error) { fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) if os.IsNotExist(err) { return n, min, max, nil @@ -184,7 +183,7 @@ func (r *FileReplicator) snapshotStats(generation string) (n int, min, max time. return n, min, max, nil } -func (r *FileReplicator) walStats(generation string) (n int, min, max time.Time, err error) { +func (r *FileReplica) walStats(generation string) (n int, min, max time.Time, err error) { fis, err := ioutil.ReadDir(r.WALDir(generation)) if os.IsNotExist(err) { return n, min, max, nil @@ -217,7 +216,7 @@ type GenerationStats struct { } // Start starts replication for a given generation. -func (r *FileReplicator) Start(ctx context.Context) { +func (r *FileReplica) Start(ctx context.Context) { // Stop previous replication. r.Stop() @@ -230,13 +229,13 @@ func (r *FileReplicator) Start(ctx context.Context) { } // Stop cancels any outstanding replication and blocks until finished. -func (r *FileReplicator) Stop() { +func (r *FileReplica) Stop() { r.cancel() r.wg.Wait() } // monitor runs in a separate goroutine and continuously replicates the DB. -func (r *FileReplicator) monitor(ctx context.Context) { +func (r *FileReplica) monitor(ctx context.Context) { // Clear old temporary files that my have been left from a crash. if err := removeTmpFiles(r.dst); err != nil { log.Printf("%s(%s): cannot remove tmp files: %w", r.db.Path(), r.Name(), err) @@ -294,9 +293,9 @@ func (r *FileReplicator) monitor(ctx context.Context) { } } -// pos returns the position for the replicator for the current generation. +// pos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. -func (r *FileReplicator) pos() (pos Pos, err error) { +func (r *FileReplica) pos() (pos Pos, err error) { // Find the current generation from the DB. Return zero pos if no generation. generation, err := r.db.CurrentGeneration() if err != nil { @@ -346,7 +345,7 @@ func (r *FileReplicator) pos() (pos Pos, err error) { } // snapshot copies the entire database to the replica path. -func (r *FileReplicator) snapshot(ctx context.Context, generation string, index int) error { +func (r *FileReplica) snapshot(ctx context.Context, generation string, index int) error { // Acquire a read lock on the database during snapshot to prevent checkpoints. tx, err := r.db.db.Begin() if err != nil { @@ -371,7 +370,7 @@ func (r *FileReplicator) snapshot(ctx context.Context, generation string, index } // snapshotN returns the number of snapshots for a generation. -func (r *FileReplicator) snapshotN(generation string) (int, error) { +func (r *FileReplica) snapshotN(generation string) (int, error) { fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) if os.IsNotExist(err) { return 0, nil @@ -388,7 +387,7 @@ func (r *FileReplicator) snapshotN(generation string) (int, error) { return n, nil } -func (r *FileReplicator) sync(ctx context.Context, pos Pos) (_ Pos, err error) { +func (r *FileReplica) sync(ctx context.Context, pos Pos) (_ Pos, err error) { // Read all WAL files since the last position. for { if pos, err = r.syncNext(ctx, pos); err == io.EOF { @@ -399,7 +398,7 @@ func (r *FileReplicator) sync(ctx context.Context, pos Pos) (_ Pos, err error) { } } -func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) { +func (r *FileReplica) syncNext(ctx context.Context, pos Pos) (_ Pos, err error) { rd, err := r.db.ShadowWALReader(pos) if err == io.EOF { return pos, err @@ -446,7 +445,7 @@ func (r *FileReplicator) syncNext(ctx context.Context, pos Pos) (_ Pos, err erro } // compress gzips all WAL files before the current one. -func (r *FileReplicator) compress(ctx context.Context, generation string) error { +func (r *FileReplica) compress(ctx context.Context, generation string) error { dir := r.WALDir(generation) filenames, err := filepath.Glob(filepath.Join(dir, "*.wal")) if err != nil { @@ -480,7 +479,7 @@ func (r *FileReplicator) compress(ctx context.Context, generation string) error // SnapsotIndexAt returns the highest index for a snapshot within a generation // that occurs before timestamp. If timestamp is zero, returns the latest snapshot. -func (r *FileReplicator) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { +func (r *FileReplica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { fis, err := ioutil.ReadDir(r.SnapshotDir(generation)) if os.IsNotExist(err) { return 0, ErrNoSnapshots @@ -513,7 +512,7 @@ func (r *FileReplicator) SnapshotIndexAt(ctx context.Context, generation string, // Returns the highest index for a WAL file that occurs before timestamp. // If timestamp is zero, returns the highest WAL index. -func (r *FileReplicator) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { +func (r *FileReplica) WALIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { fis, err := ioutil.ReadDir(r.WALDir(generation)) if os.IsNotExist(err) { return 0, nil @@ -544,7 +543,7 @@ func (r *FileReplicator) WALIndexAt(ctx context.Context, generation string, time // SnapshotReader returns a reader for snapshot data at the given generation/index. // Returns os.ErrNotExist if no matching index is found. -func (r *FileReplicator) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { +func (r *FileReplica) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { dir := r.SnapshotDir(generation) fis, err := ioutil.ReadDir(dir) if err != nil { @@ -581,7 +580,7 @@ func (r *FileReplicator) SnapshotReader(ctx context.Context, generation string, // WALReader returns a reader for WAL data at the given index. // Returns os.ErrNotExist if no matching index is found. -func (r *FileReplicator) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { +func (r *FileReplica) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { filename := r.WALPath(generation, index) // Attempt to read uncompressed file first.