diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index f3bd985..d186f61 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -9,7 +9,6 @@ import ( "log" "net/url" "os" - "os/signal" "os/user" "path" "path/filepath" @@ -87,24 +86,40 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { // Setup signal handler. ctx, cancel := context.WithCancel(ctx) - ch := signalChan() - go func() { <-ch; cancel() }() + signalCh := signalChan() if err := c.Run(ctx); err != nil { return err } // Wait for signal to stop program. - <-ctx.Done() - signal.Reset() - fmt.Println("signal received, litestream shutting down") + select { + case err = <-c.execCh: + cancel() + fmt.Println("subprocess exited, litestream shutting down") + case sig := <-signalCh: + cancel() + fmt.Println("signal received, litestream shutting down") + + if c.cmd != nil { + fmt.Println("sending signal to exec process") + if err := c.cmd.Process.Signal(sig); err != nil { + return fmt.Errorf("cannot signal exec process: %w", err) + } + + fmt.Println("waiting for exec process to close") + if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") { + return fmt.Errorf("cannot wait for exec process: %w", err) + } + } + } // Gracefully close. - if err := c.Close(); err != nil { - return err + if e := c.Close(); e != nil && err == nil { + err = e } fmt.Println("litestream shut down") - return nil + return err case "restore": return (&RestoreCommand{}).Run(ctx, args) @@ -152,6 +167,10 @@ type Config struct { // List of databases to manage. DBs []*DBConfig `yaml:"dbs"` + // Subcommand to execute during replication. + // Litestream will shutdown when subcommand exits. + Exec string `yaml:"exec"` + // Global S3 settings AccessKeyID string `yaml:"access-key-id"` SecretAccessKey string `yaml:"secret-access-key"` diff --git a/cmd/litestream/main_notwindows.go b/cmd/litestream/main_notwindows.go index c7713c0..aaf87a1 100644 --- a/cmd/litestream/main_notwindows.go +++ b/cmd/litestream/main_notwindows.go @@ -20,7 +20,7 @@ func runWindowsService(ctx context.Context) error { } func signalChan() <-chan os.Signal { - ch := make(chan os.Signal, 1) + ch := make(chan os.Signal, 2) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) return ch } diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 0dcdfe5..d0dea3e 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -9,6 +9,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "os/exec" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/abs" @@ -16,11 +17,15 @@ import ( "github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/s3" "github.com/benbjohnson/litestream/sftp" + "github.com/mattn/go-shellwords" "github.com/prometheus/client_golang/prometheus/promhttp" ) // ReplicateCommand represents a command that continuously replicates SQLite databases. type ReplicateCommand struct { + cmd *exec.Cmd // subcommand + execCh chan error // subcommand error channel + Config Config // List of managed databases specified in the config. @@ -28,12 +33,15 @@ type ReplicateCommand struct { } func NewReplicateCommand() *ReplicateCommand { - return &ReplicateCommand{} + return &ReplicateCommand{ + execCh: make(chan error), + } } // ParseFlags parses the CLI flags and loads the configuration file. func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError) + execFlag := fs.String("exec", "", "execute subcommand") tracePath := fs.String("trace", "", "trace path") configPath, noExpandEnv := registerConfigFlag(fs) fs.Usage = c.Usage @@ -67,6 +75,11 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e } } + // Override config exec command, if specified. + if *execFlag != "" { + c.Config.Exec = *execFlag + } + // Enable trace logging. if *tracePath != "" { f, err := os.Create(*tracePath) @@ -85,6 +98,7 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { // Display version information. log.Printf("litestream %s", Version) + // Setup databases. if len(c.Config.DBs) == 0 { log.Println("no databases specified in configuration") } @@ -141,6 +155,23 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { }() } + // Parse exec commands args & start subprocess. + if c.Config.Exec != "" { + execArgs, err := shellwords.Parse(c.Config.Exec) + if err != nil { + return fmt.Errorf("cannot parse exec command: %w", err) + } + + c.cmd = exec.CommandContext(ctx, execArgs[0], execArgs[1:]...) + c.cmd.Env = os.Environ() + c.cmd.Stdout = os.Stdout + c.cmd.Stderr = os.Stderr + if err := c.cmd.Start(); err != nil { + return fmt.Errorf("cannot start exec command: %w", err) + } + go func() { c.execCh <- c.cmd.Wait() }() + } + return nil } @@ -178,6 +209,10 @@ Arguments: Specifies the configuration file. Defaults to %s + -exec CMD + Executes a subcommand. Litestream will exit when the child + process exits. Useful for simple process management. + -no-expand-env Disables environment variable expansion in configuration file. diff --git a/go.mod b/go.mod index 5ed2361..1c5a0e8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/go-autorest/autorest v0.9.0 // indirect github.com/aws/aws-sdk-go v1.27.0 github.com/davecgh/go-spew v1.1.1 + github.com/mattn/go-shellwords v1.0.11 // indirect github.com/mattn/go-sqlite3 v1.14.5 github.com/pierrec/lz4/v4 v4.1.3 github.com/pkg/sftp v1.13.0 // indirect diff --git a/go.sum b/go.sum index dc2265e..bda8ce9 100644 --- a/go.sum +++ b/go.sum @@ -276,6 +276,8 @@ github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-shellwords v1.0.11 h1:vCoR9VPpsk/TZFW2JwK5I9S0xdrtUq2bph6/YjEPnaw= +github.com/mattn/go-shellwords v1.0.11/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ= github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=