Files
litestream/cmd/litestream/main.go
2021-07-23 07:46:21 -06:00

720 lines
19 KiB
Go

package main
import (
"context"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"os/signal"
"os/user"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
_ "github.com/mattn/go-sqlite3"
"gopkg.in/yaml.v2"
)
// Build information.
var (
Version = "(development build)"
)
// errStop is a terminal error for indicating program should quit.
var errStop = errors.New("stop")
func main() {
log.SetFlags(0)
m := NewMain()
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
os.Exit(1)
} else if err != nil {
log.Println(err)
os.Exit(1)
}
}
// Main represents the main program execution.
type Main struct{}
// NewMain returns a new instance of Main.
func NewMain() *Main {
return &Main{}
}
// Run executes the program.
func (m *Main) Run(ctx context.Context, args []string) (err error) {
// Execute replication command if running as a Windows service.
if isService, err := isWindowsService(); err != nil {
return err
} else if isService {
return runWindowsService(ctx)
}
// Copy "LITESTEAM" environment credentials.
applyLitestreamEnv()
// Extract command name.
var cmd string
if len(args) > 0 {
cmd, args = args[0], args[1:]
}
switch cmd {
case "databases":
return (&DatabasesCommand{}).Run(ctx, args)
case "generations":
return (&GenerationsCommand{}).Run(ctx, args)
case "replicate":
c := NewReplicateCommand()
if err := c.ParseFlags(ctx, args); err != nil {
return err
}
// Setup signal handler.
ctx, cancel := context.WithCancel(ctx)
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, notifySignals...)
if err := c.Run(ctx); err != nil {
return err
}
// Wait for signal to stop program.
select {
case <-ctx.Done():
fmt.Println("context done, litestream shutting down")
case err = <-c.execCh:
cancel()
fmt.Println("subprocess exited, litestream shutting down")
case sig := <-signalCh:
cancel()
fmt.Println("signal received, litestream shutting down")
if c.cmd != nil {
fmt.Println("sending signal to exec process")
if err := c.cmd.Process.Signal(sig); err != nil {
return fmt.Errorf("cannot signal exec process: %w", err)
}
fmt.Println("waiting for exec process to close")
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
return fmt.Errorf("cannot wait for exec process: %w", err)
}
}
}
// Gracefully close.
if e := c.Close(); e != nil && err == nil {
err = e
}
fmt.Println("litestream shut down")
return err
case "restore":
return (&RestoreCommand{}).Run(ctx, args)
case "snapshots":
return (&SnapshotsCommand{}).Run(ctx, args)
case "version":
return (&VersionCommand{}).Run(ctx, args)
case "wal":
return (&WALCommand{}).Run(ctx, args)
default:
if cmd == "" || cmd == "help" || strings.HasPrefix(cmd, "-") {
m.Usage()
return flag.ErrHelp
}
return fmt.Errorf("litestream %s: unknown command", cmd)
}
}
// Usage prints the help screen to STDOUT.
func (m *Main) Usage() {
fmt.Println(`
litestream is a tool for replicating SQLite databases.
Usage:
litestream <command> [arguments]
The commands are:
databases list databases specified in config file
generations list available generations for a database
replicate runs a server to replicate databases
restore recovers database backup from a replica
snapshots list available snapshots for a database
version prints the binary version
wal list available WAL files for a database
`[1:])
}
// Config represents a configuration file for the litestream daemon.
type Config struct {
// Bind address for serving metrics.
Addr string `yaml:"addr"`
// List of databases to manage.
DBs []*DBConfig `yaml:"dbs"`
// Subcommand to execute during replication.
// Litestream will shutdown when subcommand exits.
Exec string `yaml:"exec"`
// Global S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
}
// propagateGlobalSettings copies global S3 settings to replica configs.
func (c *Config) propagateGlobalSettings() {
for _, dbc := range c.DBs {
for _, rc := range dbc.Replicas {
if rc.AccessKeyID == "" {
rc.AccessKeyID = c.AccessKeyID
}
if rc.SecretAccessKey == "" {
rc.SecretAccessKey = c.SecretAccessKey
}
}
}
}
// DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config {
return Config{}
}
// DBConfig returns database configuration by path.
func (c *Config) DBConfig(path string) *DBConfig {
for _, dbConfig := range c.DBs {
if dbConfig.Path == path {
return dbConfig
}
}
return nil
}
// ReadConfigFile unmarshals config from filename. Expands path if needed.
// If expandEnv is true then environment variables are expanded in the config.
func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
config := DefaultConfig()
// Expand filename, if necessary.
filename, err = expand(filename)
if err != nil {
return config, err
}
// Read configuration.
buf, err := ioutil.ReadFile(filename)
if os.IsNotExist(err) {
return config, fmt.Errorf("config file not found: %s", filename)
} else if err != nil {
return config, err
}
// Expand environment variables, if enabled.
if expandEnv {
buf = []byte(os.ExpandEnv(string(buf)))
}
if err := yaml.Unmarshal(buf, &config); err != nil {
return config, err
}
// Normalize paths.
for _, dbConfig := range config.DBs {
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
return config, err
}
}
// Propage settings from global config to replica configs.
config.propagateGlobalSettings()
return config, nil
}
// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
MonitorInterval *time.Duration `yaml:"monitor-interval"`
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
Replicas []*ReplicaConfig `yaml:"replicas"`
}
// NewDBFromConfig instantiates a DB based on a configuration.
func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
path, err := expand(dbc.Path)
if err != nil {
return nil, err
}
// Initialize database with given path.
db := litestream.NewDB(path)
// Override default database settings if specified in configuration.
if dbc.MonitorInterval != nil {
db.MonitorInterval = *dbc.MonitorInterval
}
if dbc.CheckpointInterval != nil {
db.CheckpointInterval = *dbc.CheckpointInterval
}
if dbc.MinCheckpointPageN != nil {
db.MinCheckpointPageN = *dbc.MinCheckpointPageN
}
if dbc.MaxCheckpointPageN != nil {
db.MaxCheckpointPageN = *dbc.MaxCheckpointPageN
}
// Instantiate and attach replicas.
for _, rc := range dbc.Replicas {
r, err := NewReplicaFromConfig(rc, db)
if err != nil {
return nil, err
}
db.Replicas = append(db.Replicas, r)
}
return db, nil
}
// ReplicaConfig represents the configuration for a single replica in a database.
type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"`
URL string `yaml:"url"`
Retention *time.Duration `yaml:"retention"`
RetentionCheckInterval *time.Duration `yaml:"retention-check-interval"`
SyncInterval *time.Duration `yaml:"sync-interval"`
SnapshotInterval *time.Duration `yaml:"snapshot-interval"`
ValidationInterval *time.Duration `yaml:"validation-interval"`
// S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
ForcePathStyle *bool `yaml:"force-path-style"`
SkipVerify bool `yaml:"skip-verify"`
// ABS settings
AccountName string `yaml:"account-name"`
AccountKey string `yaml:"account-key"`
// SFTP settings
Host string `yaml:"host"`
User string `yaml:"user"`
Password string `yaml:"password"`
KeyPath string `yaml:"key-path"`
}
// NewReplicaFromConfig instantiates a replica for a DB based on a config.
func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Replica, err error) {
// Ensure user did not specify URL in path.
if isURL(c.Path) {
return nil, fmt.Errorf("replica path cannot be a url, please use the 'url' field instead: %s", c.Path)
}
// Build replica.
r := litestream.NewReplica(db, c.Name)
if v := c.Retention; v != nil {
r.Retention = *v
}
if v := c.RetentionCheckInterval; v != nil {
r.RetentionCheckInterval = *v
}
if v := c.SyncInterval; v != nil {
r.SyncInterval = *v
}
if v := c.SnapshotInterval; v != nil {
r.SnapshotInterval = *v
}
if v := c.ValidationInterval; v != nil {
r.ValidationInterval = *v
}
// Build and set client on replica.
switch c.ReplicaType() {
case "file":
if r.Client, err = newFileReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "s3":
if r.Client, err = newS3ReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "gcs":
if r.Client, err = newGCSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "abs":
if r.Client, err = newABSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "sftp":
if r.Client, err = newSFTPReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
}
return r, nil
}
// newFileReplicaClientFromConfig returns a new instance of file.ReplicaClient built from config.
func newFileReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *file.ReplicaClient, err error) {
// Ensure URL & path are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for file replica")
}
// Parse path from URL, if specified.
path := c.Path
if c.URL != "" {
if _, _, path, err = ParseReplicaURL(c.URL); err != nil {
return nil, err
}
}
// Ensure path is set explicitly or derived from URL field.
if path == "" {
return nil, fmt.Errorf("file replica path required")
}
// Expand home prefix and return absolute path.
if path, err = expand(path); err != nil {
return nil, err
}
// Instantiate replica and apply time fields, if set.
client := file.NewReplicaClient(path)
client.Replica = r
return client, nil
}
// newS3ReplicaClientFromConfig returns a new instance of s3.ReplicaClient built from config.
func newS3ReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *s3.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for s3 replica")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for s3 replica")
}
bucket, path := c.Bucket, c.Path
region, endpoint, skipVerify := c.Region, c.Endpoint, c.SkipVerify
// Use path style if an endpoint is explicitly set. This works because the
// only service to not use path style is AWS which does not use an endpoint.
forcePathStyle := (endpoint != "")
if v := c.ForcePathStyle; v != nil {
forcePathStyle = *v
}
// Apply settings from URL, if specified.
if c.URL != "" {
_, host, upath, err := ParseReplicaURL(c.URL)
if err != nil {
return nil, err
}
ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host)
// Only apply URL parts to field that have not been overridden.
if path == "" {
path = upath
}
if bucket == "" {
bucket = ubucket
}
if region == "" {
region = uregion
}
if endpoint == "" {
endpoint = uendpoint
}
if !forcePathStyle {
forcePathStyle = uforcePathStyle
}
}
// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for s3 replica")
}
// Build replica.
client := s3.NewReplicaClient()
client.AccessKeyID = c.AccessKeyID
client.SecretAccessKey = c.SecretAccessKey
client.Bucket = bucket
client.Path = path
client.Region = region
client.Endpoint = endpoint
client.ForcePathStyle = forcePathStyle
client.SkipVerify = skipVerify
return client, nil
}
// newGCSReplicaClientFromConfig returns a new instance of gcs.ReplicaClient built from config.
func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *gcs.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for gcs replica")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for gcs replica")
}
bucket, path := c.Bucket, c.Path
// Apply settings from URL, if specified.
if c.URL != "" {
_, uhost, upath, err := ParseReplicaURL(c.URL)
if err != nil {
return nil, err
}
// Only apply URL parts to field that have not been overridden.
if path == "" {
path = upath
}
if bucket == "" {
bucket = uhost
}
}
// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for gcs replica")
}
// Build replica.
client := gcs.NewReplicaClient()
client.Bucket = bucket
client.Path = path
return client, nil
}
// newABSReplicaClientFromConfig returns a new instance of abs.ReplicaClient built from config.
func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *abs.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for abs replica")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for abs replica")
}
// Build replica.
client := abs.NewReplicaClient()
client.AccountName = c.AccountName
client.AccountKey = c.AccountKey
client.Bucket = c.Bucket
client.Path = c.Path
client.Endpoint = c.Endpoint
// Apply settings from URL, if specified.
if c.URL != "" {
u, err := url.Parse(c.URL)
if err != nil {
return nil, err
}
if client.AccountName == "" && u.User != nil {
client.AccountName = u.User.Username()
}
if client.Bucket == "" {
client.Bucket = u.Host
}
if client.Path == "" {
client.Path = strings.TrimPrefix(path.Clean(u.Path), "/")
}
}
// Ensure required settings are set.
if client.Bucket == "" {
return nil, fmt.Errorf("bucket required for abs replica")
}
return client, nil
}
// newSFTPReplicaClientFromConfig returns a new instance of sftp.ReplicaClient built from config.
func newSFTPReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *sftp.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for sftp replica")
} else if c.URL != "" && c.Host != "" {
return nil, fmt.Errorf("cannot specify url & host for sftp replica")
}
host, user, password, path := c.Host, c.User, c.Password, c.Path
// Apply settings from URL, if specified.
if c.URL != "" {
u, err := url.Parse(c.URL)
if err != nil {
return nil, err
}
// Only apply URL parts to field that have not been overridden.
if host == "" {
host = u.Host
}
if user == "" && u.User != nil {
user = u.User.Username()
}
if password == "" && u.User != nil {
password, _ = u.User.Password()
}
if path == "" {
path = u.Path
}
}
// Ensure required settings are set.
if host == "" {
return nil, fmt.Errorf("host required for sftp replica")
} else if user == "" {
return nil, fmt.Errorf("user required for sftp replica")
}
// Build replica.
client := sftp.NewReplicaClient()
client.Host = host
client.User = user
client.Password = password
client.Path = path
client.KeyPath = c.KeyPath
return client, nil
}
// applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to
// their AWS counterparts as the "AWS" prefix can be confusing when using a
// non-AWS S3-compatible service.
func applyLitestreamEnv() {
if v, ok := os.LookupEnv("LITESTREAM_ACCESS_KEY_ID"); ok {
if _, ok := os.LookupEnv("AWS_ACCESS_KEY_ID"); !ok {
os.Setenv("AWS_ACCESS_KEY_ID", v)
}
}
if v, ok := os.LookupEnv("LITESTREAM_SECRET_ACCESS_KEY"); ok {
if _, ok := os.LookupEnv("AWS_SECRET_ACCESS_KEY"); !ok {
os.Setenv("AWS_SECRET_ACCESS_KEY", v)
}
}
}
// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
}
switch u.Scheme {
case "file":
scheme, u.Scheme = u.Scheme, ""
return scheme, "", path.Clean(u.String()), nil
case "":
return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
default:
return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
}
}
// isURL returns true if s can be parsed and has a scheme.
func isURL(s string) bool {
return regexp.MustCompile(`^\w+:\/\/`).MatchString(s)
}
// ReplicaType returns the type based on the type field or extracted from the URL.
func (c *ReplicaConfig) ReplicaType() string {
scheme, _, _, _ := ParseReplicaURL(c.URL)
if scheme != "" {
return scheme
} else if c.Type != "" {
return c.Type
}
return "file"
}
// DefaultConfigPath returns the default config path.
func DefaultConfigPath() string {
if v := os.Getenv("LITESTREAM_CONFIG"); v != "" {
return v
}
return defaultConfigPath
}
func registerConfigFlag(fs *flag.FlagSet) (configPath *string, noExpandEnv *bool) {
return fs.String("config", "", "config path"),
fs.Bool("no-expand-env", false, "do not expand env vars in config")
}
// expand returns an absolute path for s.
func expand(s string) (string, error) {
// Just expand to absolute path if there is no home directory prefix.
prefix := "~" + string(os.PathSeparator)
if s != "~" && !strings.HasPrefix(s, prefix) {
return filepath.Abs(s)
}
// Look up home directory.
u, err := user.Current()
if err != nil {
return "", err
} else if u.HomeDir == "" {
return "", fmt.Errorf("cannot expand path %s, no home directory available", s)
}
// Return path with tilde replaced by the home directory.
if s == "~" {
return u.HomeDir, nil
}
return filepath.Join(u.HomeDir, strings.TrimPrefix(s, prefix)), nil
}
// indexVar allows the flag package to parse index flags as 4-byte hexadecimal values.
type indexVar int
// Ensure type implements interface.
var _ flag.Value = (*indexVar)(nil)
// String returns an 8-character hexadecimal value.
func (v *indexVar) String() string {
return fmt.Sprintf("%08x", int(*v))
}
// Set parses s into an integer from a hexadecimal value.
func (v *indexVar) Set(s string) error {
i, err := strconv.ParseInt(s, 16, 32)
if err != nil {
return fmt.Errorf("invalid hexadecimal format")
}
*v = indexVar(i)
return nil
}