diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index c8a7850..7762732 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -2,15 +2,18 @@ package main import ( "context" + "errors" "flag" "fmt" "io/ioutil" "log" "net/url" "os" + "os/signal" "os/user" "path" "path/filepath" + "regexp" "strings" "time" @@ -25,14 +28,17 @@ var ( Version = "(development build)" ) +// errStop is a terminal error for indicating program should quit. +var errStop = errors.New("stop") + func main() { log.SetFlags(0) m := NewMain() - if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp { + if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop { os.Exit(1) } else if err != nil { - fmt.Fprintln(os.Stderr, err) + log.Println(err) os.Exit(1) } } @@ -47,6 +53,14 @@ func NewMain() *Main { // Run executes the program. func (m *Main) Run(ctx context.Context, args []string) (err error) { + // Execute replication command if running as a Windows service. + if isService, err := isWindowsService(); err != nil { + return err + } else if isService { + return runWindowsService(ctx) + } + + // Extract command name. var cmd string if len(args) > 0 { cmd, args = args[0], args[1:] @@ -58,7 +72,28 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { case "generations": return (&GenerationsCommand{}).Run(ctx, args) case "replicate": - return (&ReplicateCommand{}).Run(ctx, args) + c := NewReplicateCommand() + if err := c.ParseFlags(ctx, args); err != nil { + return err + } + + // Setup signal handler. + ctx, cancel := context.WithCancel(ctx) + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + go func() { <-ch; cancel() }() + + if err := c.Run(ctx); err != nil { + return err + } + + // Wait for signal to stop program. + <-ctx.Done() + signal.Reset() + + // Gracefully close. + return c.Close() + case "restore": return (&RestoreCommand{}).Run(ctx, args) case "snapshots": @@ -222,8 +257,7 @@ func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) { // isURL returns true if s can be parsed and has a scheme. func isURL(s string) bool { - u, err := url.Parse(s) - return err == nil && u.Scheme != "" + return regexp.MustCompile(`^\w+:\/\/`).MatchString(s) } // ReplicaType returns the type based on the type field or extracted from the URL. @@ -242,7 +276,7 @@ func DefaultConfigPath() string { if v := os.Getenv("LITESTREAM_CONFIG"); v != "" { return v } - return "/etc/litestream.yml" + return defaultConfigPath } func registerConfigFlag(fs *flag.FlagSet, p *string) { diff --git a/cmd/litestream/main_notwindows.go b/cmd/litestream/main_notwindows.go new file mode 100644 index 0000000..8304106 --- /dev/null +++ b/cmd/litestream/main_notwindows.go @@ -0,0 +1,17 @@ +// +build !windows + +package main + +import ( + "context" +) + +const defaultConfigPath = "/etc/litestream.yml" + +func isWindowsService() (bool, error) { + return false, nil +} + +func runWindowsService(ctx context.Context) error { + panic("cannot run windows service as unix process") +} diff --git a/cmd/litestream/main_windows.go b/cmd/litestream/main_windows.go new file mode 100644 index 0000000..31f1a0f --- /dev/null +++ b/cmd/litestream/main_windows.go @@ -0,0 +1,105 @@ +// +build windows + +package main + +import ( + "context" + "io" + "log" + "os" + + "golang.org/x/sys/windows" + "golang.org/x/sys/windows/svc" + "golang.org/x/sys/windows/svc/eventlog" +) + +const defaultConfigPath = `C:\Litestream\litestream.yml` + +// serviceName is the Windows Service name. +const serviceName = "Litestream" + +// isWindowsService returns true if currently executing within a Windows service. +func isWindowsService() (bool, error) { + return svc.IsWindowsService() +} + +func runWindowsService(ctx context.Context) error { + // Attempt to install new log service. This will fail if already installed. + // We don't log the error because we don't have anywhere to log until we open the log. + _ = eventlog.InstallAsEventCreate(serviceName, eventlog.Error|eventlog.Warning|eventlog.Info) + + elog, err := eventlog.Open(serviceName) + if err != nil { + return err + } + defer elog.Close() + + // Set eventlog as log writer while running. + log.SetOutput((*eventlogWriter)(elog)) + defer log.SetOutput(os.Stderr) + + log.Print("Litestream service starting") + + if err := svc.Run(serviceName, &windowsService{ctx: ctx}); err != nil { + return errStop + } + + log.Print("Litestream service stopped") + return nil +} + +// windowsService is an interface adapter for svc.Handler. +type windowsService struct { + ctx context.Context +} + +func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, statusCh chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { + var err error + + // Notify Windows that the service is starting up. + statusCh <- svc.Status{State: svc.StartPending} + + // Instantiate replication command and load configuration. + c := NewReplicateCommand() + if c.Config, err = ReadConfigFile(DefaultConfigPath()); err != nil { + log.Printf("cannot load configuration: %s", err) + return true, 1 + } + + // Execute replication command. + if err := c.Run(s.ctx); err != nil { + log.Printf("cannot replicate: %s", err) + statusCh <- svc.Status{State: svc.StopPending} + return true, 2 + } + + // Notify Windows that the service is now running. + statusCh <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop} + + for { + select { + case req := <-r: + switch req.Cmd { + case svc.Stop: + c.Close() + statusCh <- svc.Status{State: svc.StopPending} + return false, windows.NO_ERROR + case svc.Interrogate: + statusCh <- req.CurrentStatus + default: + log.Printf("Litestream service received unexpected change request cmd: %d", req.Cmd) + } + } + } +} + +// Ensure implementation implements io.Writer interface. +var _ io.Writer = (*eventlogWriter)(nil) + +// eventlogWriter is an adapter for using eventlog.Log as an io.Writer. +type eventlogWriter eventlog.Log + +func (w *eventlogWriter) Write(p []byte) (n int, err error) { + elog := (*eventlog.Log)(w) + return 0, elog.Info(1, string(p)) +} diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index bfe1686..3ae515a 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -10,7 +10,6 @@ import ( "net/http" _ "net/http/pprof" "os" - "os/signal" "time" "github.com/benbjohnson/litestream" @@ -27,8 +26,12 @@ type ReplicateCommand struct { DBs []*litestream.DB } -// Run loads all databases specified in the configuration. -func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { +func NewReplicateCommand() *ReplicateCommand { + return &ReplicateCommand{} +} + +// ParseFlags parses the CLI flags and loads the configuration file. +func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError) tracePath := fs.String("trace", "", "trace path") registerConfigFlag(fs, &c.ConfigPath) @@ -38,7 +41,6 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { } // Load configuration or use CLI args to build db/replica. - var config Config if fs.NArg() == 1 { return fmt.Errorf("must specify at least one replica URL for %s", fs.Arg(0)) } else if fs.NArg() > 1 { @@ -49,10 +51,9 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { SyncInterval: 1 * time.Second, }) } - config.DBs = []*DBConfig{dbConfig} + c.Config.DBs = []*DBConfig{dbConfig} } else if c.ConfigPath != "" { - config, err = ReadConfigFile(c.ConfigPath) - if err != nil { + if c.Config, err = ReadConfigFile(c.ConfigPath); err != nil { return err } } else { @@ -69,21 +70,20 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { litestream.Tracef = log.New(f, "", log.LstdFlags|log.LUTC|log.Lshortfile).Printf } - // Setup signal handler. - ctx, cancel := context.WithCancel(ctx) - ch := make(chan os.Signal, 1) - signal.Notify(ch, os.Interrupt) - go func() { <-ch; cancel() }() + return nil +} +// Run loads all databases specified in the configuration. +func (c *ReplicateCommand) Run(ctx context.Context) (err error) { // Display version information. - fmt.Printf("litestream %s\n", Version) + log.Printf("litestream %s", Version) - if len(config.DBs) == 0 { - fmt.Println("no databases specified in configuration") + if len(c.Config.DBs) == 0 { + log.Println("no databases specified in configuration") } - for _, dbConfig := range config.DBs { - db, err := newDBFromConfig(&config, dbConfig) + for _, dbConfig := range c.Config.DBs { + db, err := newDBFromConfig(&c.Config, dbConfig) if err != nil { return err } @@ -97,41 +97,31 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { // Notify user that initialization is done. for _, db := range c.DBs { - fmt.Printf("initialized db: %s\n", db.Path()) + log.Printf("initialized db: %s", db.Path()) for _, r := range db.Replicas { switch r := r.(type) { case *litestream.FileReplica: - fmt.Printf("replicating to: name=%q type=%q path=%q\n", r.Name(), r.Type(), r.Path()) + log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), r.Type(), r.Path()) case *s3.Replica: - fmt.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q\n", r.Name(), r.Type(), r.Bucket, r.Path, r.Region) + log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q", r.Name(), r.Type(), r.Bucket, r.Path, r.Region) default: - fmt.Printf("replicating to: name=%q type=%q\n", r.Name(), r.Type()) + log.Printf("replicating to: name=%q type=%q", r.Name(), r.Type()) } } } // Serve metrics over HTTP if enabled. - if config.Addr != "" { - _, port, _ := net.SplitHostPort(config.Addr) - fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port) + if c.Config.Addr != "" { + _, port, _ := net.SplitHostPort(c.Config.Addr) + log.Printf("serving metrics on http://localhost:%s/metrics", port) go func() { http.Handle("/metrics", promhttp.Handler()) - if err := http.ListenAndServe(config.Addr, nil); err != nil { + if err := http.ListenAndServe(c.Config.Addr, nil); err != nil { log.Printf("cannot start metrics server: %s", err) } }() } - // Wait for signal to stop program. - <-ctx.Done() - signal.Reset() - - // Gracefully close - if err := c.Close(); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - return nil } @@ -139,12 +129,13 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { func (c *ReplicateCommand) Close() (err error) { for _, db := range c.DBs { if e := db.SoftClose(); e != nil { - fmt.Printf("error closing db: path=%s err=%s\n", db.Path(), e) + log.Printf("error closing db: path=%s err=%s", db.Path(), e) if err == nil { err = e } } } + // TODO(windows): Clear DBs return err } diff --git a/etc/build.ps1 b/etc/build.ps1 new file mode 100644 index 0000000..d8e78a2 --- /dev/null +++ b/etc/build.ps1 @@ -0,0 +1,17 @@ +[CmdletBinding()] +Param ( + [Parameter(Mandatory = $true)] + [String] $Version +) +$ErrorActionPreference = "Stop" + +# Update working directory. +Push-Location $PSScriptRoot +Trap { + Pop-Location +} + +Invoke-Expression "candle.exe -nologo -arch x64 -ext WixUtilExtension -out litestream.wixobj -dVersion=`"$Version`" litestream.wxs" +Invoke-Expression "light.exe -nologo -spdb -ext WixUtilExtension -out `"litestream-${Version}.msi`" litestream.wixobj" + +Pop-Location diff --git a/etc/litestream.wxs b/etc/litestream.wxs new file mode 100644 index 0000000..034c966 --- /dev/null +++ b/etc/litestream.wxs @@ -0,0 +1,89 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/go.mod b/go.mod index 127151a..90c25b7 100644 --- a/go.mod +++ b/go.mod @@ -8,5 +8,6 @@ require ( github.com/mattn/go-sqlite3 v1.14.5 github.com/pierrec/lz4/v4 v4.1.3 github.com/prometheus/client_golang v1.9.0 + golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e gopkg.in/yaml.v2 v2.4.0 )