Add simple subprocess execution

This commit adds the ability to run a subcommand through Litestream.
Shutting down the subcommand will cause Litestream to gracefully
shutdown. Litestream will forward interrupt signals and wait for
the subprocess to shutdown.
This commit is contained in:
Ben Johnson
2021-05-24 18:55:58 -06:00
parent c06997789b
commit 8fb9c910f0
5 changed files with 68 additions and 11 deletions

View File

@@ -9,7 +9,6 @@ import (
"log" "log"
"net/url" "net/url"
"os" "os"
"os/signal"
"os/user" "os/user"
"path" "path"
"path/filepath" "path/filepath"
@@ -87,24 +86,40 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
// Setup signal handler. // Setup signal handler.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
ch := signalChan() signalCh := signalChan()
go func() { <-ch; cancel() }()
if err := c.Run(ctx); err != nil { if err := c.Run(ctx); err != nil {
return err return err
} }
// Wait for signal to stop program. // Wait for signal to stop program.
<-ctx.Done() select {
signal.Reset() case err = <-c.execCh:
cancel()
fmt.Println("subprocess exited, litestream shutting down")
case sig := <-signalCh:
cancel()
fmt.Println("signal received, litestream shutting down") fmt.Println("signal received, litestream shutting down")
if c.cmd != nil {
fmt.Println("sending signal to exec process")
if err := c.cmd.Process.Signal(sig); err != nil {
return fmt.Errorf("cannot signal exec process: %w", err)
}
fmt.Println("waiting for exec process to close")
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
return fmt.Errorf("cannot wait for exec process: %w", err)
}
}
}
// Gracefully close. // Gracefully close.
if err := c.Close(); err != nil { if e := c.Close(); e != nil && err == nil {
return err err = e
} }
fmt.Println("litestream shut down") fmt.Println("litestream shut down")
return nil return err
case "restore": case "restore":
return (&RestoreCommand{}).Run(ctx, args) return (&RestoreCommand{}).Run(ctx, args)
@@ -152,6 +167,10 @@ type Config struct {
// List of databases to manage. // List of databases to manage.
DBs []*DBConfig `yaml:"dbs"` DBs []*DBConfig `yaml:"dbs"`
// Subcommand to execute during replication.
// Litestream will shutdown when subcommand exits.
Exec string `yaml:"exec"`
// Global S3 settings // Global S3 settings
AccessKeyID string `yaml:"access-key-id"` AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"` SecretAccessKey string `yaml:"secret-access-key"`

View File

@@ -20,7 +20,7 @@ func runWindowsService(ctx context.Context) error {
} }
func signalChan() <-chan os.Signal { func signalChan() <-chan os.Signal {
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 2)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
return ch return ch
} }

View File

@@ -9,6 +9,7 @@ import (
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/exec"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs" "github.com/benbjohnson/litestream/abs"
@@ -16,11 +17,15 @@ import (
"github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/s3" "github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp" "github.com/benbjohnson/litestream/sftp"
"github.com/mattn/go-shellwords"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
) )
// ReplicateCommand represents a command that continuously replicates SQLite databases. // ReplicateCommand represents a command that continuously replicates SQLite databases.
type ReplicateCommand struct { type ReplicateCommand struct {
cmd *exec.Cmd // subcommand
execCh chan error // subcommand error channel
Config Config Config Config
// List of managed databases specified in the config. // List of managed databases specified in the config.
@@ -28,12 +33,15 @@ type ReplicateCommand struct {
} }
func NewReplicateCommand() *ReplicateCommand { func NewReplicateCommand() *ReplicateCommand {
return &ReplicateCommand{} return &ReplicateCommand{
execCh: make(chan error),
}
} }
// ParseFlags parses the CLI flags and loads the configuration file. // ParseFlags parses the CLI flags and loads the configuration file.
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) { func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError) fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
execFlag := fs.String("exec", "", "execute subcommand")
tracePath := fs.String("trace", "", "trace path") tracePath := fs.String("trace", "", "trace path")
configPath, noExpandEnv := registerConfigFlag(fs) configPath, noExpandEnv := registerConfigFlag(fs)
fs.Usage = c.Usage fs.Usage = c.Usage
@@ -67,6 +75,11 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
} }
} }
// Override config exec command, if specified.
if *execFlag != "" {
c.Config.Exec = *execFlag
}
// Enable trace logging. // Enable trace logging.
if *tracePath != "" { if *tracePath != "" {
f, err := os.Create(*tracePath) f, err := os.Create(*tracePath)
@@ -85,6 +98,7 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
// Display version information. // Display version information.
log.Printf("litestream %s", Version) log.Printf("litestream %s", Version)
// Setup databases.
if len(c.Config.DBs) == 0 { if len(c.Config.DBs) == 0 {
log.Println("no databases specified in configuration") log.Println("no databases specified in configuration")
} }
@@ -141,6 +155,23 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
}() }()
} }
// Parse exec commands args & start subprocess.
if c.Config.Exec != "" {
execArgs, err := shellwords.Parse(c.Config.Exec)
if err != nil {
return fmt.Errorf("cannot parse exec command: %w", err)
}
c.cmd = exec.CommandContext(ctx, execArgs[0], execArgs[1:]...)
c.cmd.Env = os.Environ()
c.cmd.Stdout = os.Stdout
c.cmd.Stderr = os.Stderr
if err := c.cmd.Start(); err != nil {
return fmt.Errorf("cannot start exec command: %w", err)
}
go func() { c.execCh <- c.cmd.Wait() }()
}
return nil return nil
} }
@@ -178,6 +209,10 @@ Arguments:
Specifies the configuration file. Specifies the configuration file.
Defaults to %s Defaults to %s
-exec CMD
Executes a subcommand. Litestream will exit when the child
process exits. Useful for simple process management.
-no-expand-env -no-expand-env
Disables environment variable expansion in configuration file. Disables environment variable expansion in configuration file.

1
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/Azure/go-autorest/autorest v0.9.0 // indirect github.com/Azure/go-autorest/autorest v0.9.0 // indirect
github.com/aws/aws-sdk-go v1.27.0 github.com/aws/aws-sdk-go v1.27.0
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/mattn/go-shellwords v1.0.11 // indirect
github.com/mattn/go-sqlite3 v1.14.5 github.com/mattn/go-sqlite3 v1.14.5
github.com/pierrec/lz4/v4 v4.1.3 github.com/pierrec/lz4/v4 v4.1.3
github.com/pkg/sftp v1.13.0 // indirect github.com/pkg/sftp v1.13.0 // indirect

2
go.sum
View File

@@ -276,6 +276,8 @@ github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-shellwords v1.0.11 h1:vCoR9VPpsk/TZFW2JwK5I9S0xdrtUq2bph6/YjEPnaw=
github.com/mattn/go-shellwords v1.0.11/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ= github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI= github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=