Allow global AWS settings in config.
This commit is contained in:
@@ -36,7 +36,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
|
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
|
||||||
fmt.Fprintln(w, "path\treplicas")
|
fmt.Fprintln(w, "path\treplicas")
|
||||||
for _, dbConfig := range config.DBs {
|
for _, dbConfig := range config.DBs {
|
||||||
db, err := newDBFromConfig(dbConfig)
|
db, err := newDBFromConfig(&config, dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
|||||||
if dbConfig == nil {
|
if dbConfig == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", dbPath)
|
return fmt.Errorf("database not found in config: %s", dbPath)
|
||||||
}
|
}
|
||||||
db, err := newDBFromConfig(dbConfig)
|
db, err := newDBFromConfig(&config, dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -106,6 +106,12 @@ type Config struct {
|
|||||||
|
|
||||||
// List of databases to manage.
|
// List of databases to manage.
|
||||||
DBs []*DBConfig `yaml:"dbs"`
|
DBs []*DBConfig `yaml:"dbs"`
|
||||||
|
|
||||||
|
// Global S3 settings
|
||||||
|
AccessKeyID string `yaml:"access-key-id"`
|
||||||
|
SecretAccessKey string `yaml:"secret-access-key"`
|
||||||
|
Region string `yaml:"region"`
|
||||||
|
Bucket string `yaml:"bucket"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) Normalize() error {
|
func (c *Config) Normalize() error {
|
||||||
@@ -245,13 +251,13 @@ func registerConfigFlag(fs *flag.FlagSet, p *string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newDBFromConfig instantiates a DB based on a configuration.
|
// newDBFromConfig instantiates a DB based on a configuration.
|
||||||
func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
|
func newDBFromConfig(c *Config, dbc *DBConfig) (*litestream.DB, error) {
|
||||||
// Initialize database with given path.
|
// Initialize database with given path.
|
||||||
db := litestream.NewDB(config.Path)
|
db := litestream.NewDB(dbc.Path)
|
||||||
|
|
||||||
// Instantiate and attach replicas.
|
// Instantiate and attach replicas.
|
||||||
for _, rconfig := range config.Replicas {
|
for _, rc := range dbc.Replicas {
|
||||||
r, err := newReplicaFromConfig(db, rconfig)
|
r, err := newReplicaFromConfig(db, c, dbc, rc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -262,57 +268,77 @@ func newDBFromConfig(config *DBConfig) (*litestream.DB, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newReplicaFromConfig instantiates a replica for a DB based on a config.
|
// newReplicaFromConfig instantiates a replica for a DB based on a config.
|
||||||
func newReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (litestream.Replica, error) {
|
func newReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (litestream.Replica, error) {
|
||||||
switch config.Type {
|
switch rc.Type {
|
||||||
case "", "file":
|
case "", "file":
|
||||||
return newFileReplicaFromConfig(db, config)
|
return newFileReplicaFromConfig(db, c, dbc, rc)
|
||||||
case "s3":
|
case "s3":
|
||||||
return newS3ReplicaFromConfig(db, config)
|
return newS3ReplicaFromConfig(db, c, dbc, rc)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown replica type in config: %q", config.Type)
|
return nil, fmt.Errorf("unknown replica type in config: %q", rc.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
|
// newFileReplicaFromConfig returns a new instance of FileReplica build from config.
|
||||||
func newFileReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*litestream.FileReplica, error) {
|
func newFileReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*litestream.FileReplica, error) {
|
||||||
if config.Path == "" {
|
if rc.Path == "" {
|
||||||
return nil, fmt.Errorf("%s: file replica path required", db.Path())
|
return nil, fmt.Errorf("%s: file replica path required", db.Path())
|
||||||
}
|
}
|
||||||
|
|
||||||
r := litestream.NewFileReplica(db, config.Name, config.Path)
|
r := litestream.NewFileReplica(db, rc.Name, rc.Path)
|
||||||
if v := config.Retention; v > 0 {
|
if v := rc.Retention; v > 0 {
|
||||||
r.Retention = v
|
r.Retention = v
|
||||||
}
|
}
|
||||||
if v := config.RetentionCheckInterval; v > 0 {
|
if v := rc.RetentionCheckInterval; v > 0 {
|
||||||
r.RetentionCheckInterval = v
|
r.RetentionCheckInterval = v
|
||||||
}
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
|
// newS3ReplicaFromConfig returns a new instance of S3Replica build from config.
|
||||||
func newS3ReplicaFromConfig(db *litestream.DB, config *ReplicaConfig) (*s3.Replica, error) {
|
func newS3ReplicaFromConfig(db *litestream.DB, c *Config, dbc *DBConfig, rc *ReplicaConfig) (*s3.Replica, error) {
|
||||||
if config.AccessKeyID == "" {
|
// Use global or replica-specific S3 settings.
|
||||||
|
accessKeyID := c.AccessKeyID
|
||||||
|
if v := rc.AccessKeyID; v != "" {
|
||||||
|
accessKeyID = v
|
||||||
|
}
|
||||||
|
secretAccessKey := c.SecretAccessKey
|
||||||
|
if v := rc.SecretAccessKey; v != "" {
|
||||||
|
secretAccessKey = v
|
||||||
|
}
|
||||||
|
bucket := c.Bucket
|
||||||
|
if v := rc.Bucket; v != "" {
|
||||||
|
bucket = v
|
||||||
|
}
|
||||||
|
region := c.Region
|
||||||
|
if v := rc.Region; v != "" {
|
||||||
|
region = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure required settings are set.
|
||||||
|
if accessKeyID == "" {
|
||||||
return nil, fmt.Errorf("%s: s3 access key id required", db.Path())
|
return nil, fmt.Errorf("%s: s3 access key id required", db.Path())
|
||||||
} else if config.SecretAccessKey == "" {
|
} else if secretAccessKey == "" {
|
||||||
return nil, fmt.Errorf("%s: s3 secret access key required", db.Path())
|
return nil, fmt.Errorf("%s: s3 secret access key required", db.Path())
|
||||||
} else if config.Bucket == "" {
|
} else if bucket == "" {
|
||||||
return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
|
return nil, fmt.Errorf("%s: s3 bucket required", db.Path())
|
||||||
}
|
}
|
||||||
|
|
||||||
r := s3.NewReplica(db, config.Name)
|
// Build replica.
|
||||||
r.AccessKeyID = config.AccessKeyID
|
r := s3.NewReplica(db, rc.Name)
|
||||||
r.SecretAccessKey = config.SecretAccessKey
|
r.AccessKeyID = accessKeyID
|
||||||
r.Region = config.Region
|
r.SecretAccessKey = secretAccessKey
|
||||||
r.Bucket = config.Bucket
|
r.Region = region
|
||||||
r.Path = config.Path
|
r.Bucket = bucket
|
||||||
|
r.Path = rc.Path
|
||||||
|
|
||||||
if v := config.Retention; v > 0 {
|
if v := rc.Retention; v > 0 {
|
||||||
r.Retention = v
|
r.Retention = v
|
||||||
}
|
}
|
||||||
if v := config.RetentionCheckInterval; v > 0 {
|
if v := rc.RetentionCheckInterval; v > 0 {
|
||||||
r.RetentionCheckInterval = v
|
r.RetentionCheckInterval = v
|
||||||
}
|
}
|
||||||
if v := config.SyncInterval; v > 0 {
|
if v := rc.SyncInterval; v > 0 {
|
||||||
r.SyncInterval = v
|
r.SyncInterval = v
|
||||||
}
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, dbConfig := range config.DBs {
|
for _, dbConfig := range config.DBs {
|
||||||
db, err := newDBFromConfig(dbConfig)
|
db, err := newDBFromConfig(&config, dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
if dbConfig == nil {
|
if dbConfig == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", dbPath)
|
return fmt.Errorf("database not found in config: %s", dbPath)
|
||||||
}
|
}
|
||||||
db, err := newDBFromConfig(dbConfig)
|
db, err := newDBFromConfig(&config, dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
if dbConfig == nil {
|
if dbConfig == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", dbPath)
|
return fmt.Errorf("database not found in config: %s", dbPath)
|
||||||
}
|
}
|
||||||
db, err := newDBFromConfig(dbConfig)
|
db, err := newDBFromConfig(&config, dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ func (c *ValidateCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
if dbConfig == nil {
|
if dbConfig == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", dbPath)
|
return fmt.Errorf("database not found in config: %s", dbPath)
|
||||||
}
|
}
|
||||||
db, err := newDBFromConfig(dbConfig)
|
db, err := newDBFromConfig(&config, dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
if dbConfig == nil {
|
if dbConfig == nil {
|
||||||
return fmt.Errorf("database not found in config: %s", dbPath)
|
return fmt.Errorf("database not found in config: %s", dbPath)
|
||||||
}
|
}
|
||||||
db, err := newDBFromConfig(dbConfig)
|
db, err := newDBFromConfig(&config, dbConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user