From d1ac03bd8cfab43aea826c6c0bc89935ae4a8148 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 24 May 2021 14:26:31 -0600 Subject: [PATCH] Add SFTP replica type --- .github/workflows/test.yml | 14 + cmd/litestream/main.go | 61 +++++ cmd/litestream/replicate.go | 3 + go.mod | 2 + go.sum | 11 + replica_client_test.go | 34 ++- sftp/replica_client.go | 511 ++++++++++++++++++++++++++++++++++++ 7 files changed, 635 insertions(+), 1 deletion(-) create mode 100644 sftp/replica_client.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a07ee68..4d3122b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,6 +23,12 @@ jobs: env: GOOGLE_APPLICATION_CREDENTIALS: ${{secrets.GOOGLE_APPLICATION_CREDENTIALS}} + - name: Extract SSH key + run: 'echo "$LITESTREAM_SFTP_KEY" > /opt/id_ed25519' + shell: bash + env: + LITESTREAM_SFTP_KEY: ${{secrets.LITESTREAM_SFTP_KEY}} + - name: Run unit tests run: go test -v ./... @@ -46,3 +52,11 @@ jobs: LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }} LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }} LITESTREAM_ABS_BUCKET: ${{ secrets.LITESTREAM_ABS_BUCKET }} + + - name: Run sftp tests + run: go test -v -run=TestReplicaClient . -integration sftp + env: + LITESTREAM_SFTP_HOST: ${{ secrets.LITESTREAM_SFTP_HOST }} + LITESTREAM_SFTP_USER: ${{ secrets.LITESTREAM_SFTP_USER }} + LITESTREAM_SFTP_KEY_PATH: /opt/id_ed25519 + LITESTREAM_SFTP_PATH: ${{ secrets.LITESTREAM_SFTP_PATH }} diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 31436c9..97ff50b 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -23,6 +23,7 @@ import ( "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/s3" + "github.com/benbjohnson/litestream/sftp" _ "github.com/mattn/go-sqlite3" "gopkg.in/yaml.v2" ) @@ -297,6 +298,12 @@ type ReplicaConfig struct { // ABS settings AccountName string `yaml:"account-name"` AccountKey string `yaml:"account-key"` + + // SFTP settings + Host string `yaml:"host"` + User string `yaml:"user"` + Password string `yaml:"password"` + KeyPath string `yaml:"key-path"` } // NewReplicaFromConfig instantiates a replica for a DB based on a config. @@ -344,6 +351,10 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re if r.Client, err = newABSReplicaClientFromConfig(c, r); err != nil { return nil, err } + case "sftp": + if r.Client, err = newSFTPReplicaClientFromConfig(c, r); err != nil { + return nil, err + } default: return nil, fmt.Errorf("unknown replica type in config: %q", c.Type) } @@ -526,6 +537,56 @@ func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ * return client, nil } +// newSFTPReplicaClientFromConfig returns a new instance of sftp.ReplicaClient built from config. +func newSFTPReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *sftp.ReplicaClient, err error) { + // Ensure URL & constituent parts are not both specified. + if c.URL != "" && c.Path != "" { + return nil, fmt.Errorf("cannot specify url & path for sftp replica") + } else if c.URL != "" && c.Host != "" { + return nil, fmt.Errorf("cannot specify url & host for sftp replica") + } + + host, user, password, path := c.Host, c.User, c.Password, c.Path + + // Apply settings from URL, if specified. + if c.URL != "" { + u, err := url.Parse(c.URL) + if err != nil { + return nil, err + } + + // Only apply URL parts to field that have not been overridden. + if host == "" { + host = u.Host + } + if user == "" && u.User != nil { + user = u.User.Username() + } + if password == "" && u.User != nil { + password, _ = u.User.Password() + } + if path == "" { + path = u.Path + } + } + + // Ensure required settings are set. + if host == "" { + return nil, fmt.Errorf("host required for sftp replica") + } else if user == "" { + return nil, fmt.Errorf("user required for sftp replica") + } + + // Build replica. + client := sftp.NewReplicaClient() + client.Host = host + client.User = user + client.Password = password + client.Path = path + client.KeyPath = c.KeyPath + return client, nil +} + // applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to // their AWS counterparts as the "AWS" prefix can be confusing when using a // non-AWS S3-compatible service. diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 478f585..0dcdfe5 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -15,6 +15,7 @@ import ( "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/s3" + "github.com/benbjohnson/litestream/sftp" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -114,6 +115,8 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { log.Printf("replicating to: name=%q type=%q bucket=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, r.SyncInterval) case *abs.ReplicaClient: log.Printf("replicating to: name=%q type=%q bucket=%q path=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Endpoint, r.SyncInterval) + case *sftp.ReplicaClient: + log.Printf("replicating to: name=%q type=%q host=%q user=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Host, client.User, client.Path, r.SyncInterval) default: log.Printf("replicating to: name=%q type=%q", r.Name(), client.Type()) } diff --git a/go.mod b/go.mod index d81acc2..5ed2361 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,9 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/mattn/go-sqlite3 v1.14.5 github.com/pierrec/lz4/v4 v4.1.3 + github.com/pkg/sftp v1.13.0 // indirect github.com/prometheus/client_golang v1.9.0 + golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210412220455-f1c623a9e750 google.golang.org/api v0.45.0 diff --git a/go.sum b/go.sum index 8e762c6..dc2265e 100644 --- a/go.sum +++ b/go.sum @@ -261,6 +261,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -327,8 +329,11 @@ github.com/pierrec/lz4/v4 v4.1.3 h1:/dvQpkb0o1pVlSgKNQqfkavlnXaIK+hJ0LXsKRUN9D4= github.com/pierrec/lz4/v4 v4.1.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/sftp v1.13.0 h1:Riw6pgOKK41foc1I1Uu03CjvbLZDXeGpInycM4shXoI= +github.com/pkg/sftp v1.13.0/go.mod h1:41g+FIPlQUTDCveupEmEA65IoiQFrtgCeDopC4ajGIM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -420,6 +425,10 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc= +golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -542,6 +551,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -577,6 +587,7 @@ golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210412220455-f1c623a9e750 h1:ZBu6861dZq7xBnG1bn5SRU0vA8nx42at4+kP07FMTog= golang.org/x/sys v0.0.0-20210412220455-f1c623a9e750/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/replica_client_test.go b/replica_client_test.go index 07d68bf..69f9746 100644 --- a/replica_client_test.go +++ b/replica_client_test.go @@ -19,6 +19,7 @@ import ( "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gcs" "github.com/benbjohnson/litestream/s3" + "github.com/benbjohnson/litestream/sftp" ) func init() { @@ -57,6 +58,15 @@ var ( absPath = flag.String("abs-path", os.Getenv("LITESTREAM_ABS_PATH"), "") ) +// SFTP settings +var ( + sftpHost = flag.String("sftp-host", os.Getenv("LITESTREAM_SFTP_HOST"), "") + sftpUser = flag.String("sftp-user", os.Getenv("LITESTREAM_SFTP_USER"), "") + sftpPassword = flag.String("sftp-password", os.Getenv("LITESTREAM_SFTP_PASSWORD"), "") + sftpKeyPath = flag.String("sftp-key-path", os.Getenv("LITESTREAM_SFTP_KEY_PATH"), "") + sftpPath = flag.String("sftp-path", os.Getenv("LITESTREAM_SFTP_PATH"), "") +) + func TestReplicaClient_Generations(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() @@ -474,6 +484,8 @@ func NewReplicaClient(tb testing.TB, typ string) litestream.ReplicaClient { return NewGCSReplicaClient(tb) case abs.ReplicaClientType: return NewABSReplicaClient(tb) + case sftp.ReplicaClientType: + return NewSFTPReplicaClient(tb) default: tb.Fatalf("invalid replica client type: %q", typ) return nil @@ -524,13 +536,26 @@ func NewABSReplicaClient(tb testing.TB) *abs.ReplicaClient { return c } +// NewSFTPReplicaClient returns a new client for integration testing. +func NewSFTPReplicaClient(tb testing.TB) *sftp.ReplicaClient { + tb.Helper() + + c := sftp.NewReplicaClient() + c.Host = *sftpHost + c.User = *sftpUser + c.Password = *sftpPassword + c.KeyPath = *sftpKeyPath + c.Path = path.Join(*sftpPath, fmt.Sprintf("%016x", rand.Uint64())) + return c +} + // MustDeleteAll deletes all objects under the client's path. func MustDeleteAll(tb testing.TB, c litestream.ReplicaClient) { tb.Helper() generations, err := c.Generations(context.Background()) if err != nil { - tb.Fatal(err) + tb.Fatalf("cannot list generations for deletion: %s", err) } for _, generation := range generations { @@ -538,4 +563,11 @@ func MustDeleteAll(tb testing.TB, c litestream.ReplicaClient) { tb.Fatalf("cannot delete generation: %s", err) } } + + switch c := c.(type) { + case *sftp.ReplicaClient: + if err := c.Cleanup(context.Background()); err != nil { + tb.Fatalf("cannot cleanup sftp: %s", err) + } + } } diff --git a/sftp/replica_client.go b/sftp/replica_client.go new file mode 100644 index 0000000..19290b4 --- /dev/null +++ b/sftp/replica_client.go @@ -0,0 +1,511 @@ +package sftp + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "os" + "path" + "sort" + "sync" + "time" + + "github.com/benbjohnson/litestream" + "github.com/pkg/sftp" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/crypto/ssh" +) + +// TODO(sftp): Add public key support + +// ReplicaClientType is the client type for this package. +const ReplicaClientType = "sftp" + +// Default settings for replica client. +const ( + DefaultDialTimeout = 30 * time.Second +) + +var _ litestream.ReplicaClient = (*ReplicaClient)(nil) + +// ReplicaClient is a client for writing snapshots & WAL segments to disk. +type ReplicaClient struct { + mu sync.Mutex + sshClient *ssh.Client + sftpClient *sftp.Client + + // SFTP connection info + Host string + User string + Password string + Path string + KeyPath string + DialTimeout time.Duration +} + +// NewReplicaClient returns a new instance of ReplicaClient. +func NewReplicaClient() *ReplicaClient { + return &ReplicaClient{ + DialTimeout: DefaultDialTimeout, + } +} + +// Type returns "gcs" as the client type. +func (c *ReplicaClient) Type() string { + return ReplicaClientType +} + +// Init initializes the connection to GCS. No-op if already initialized. +func (c *ReplicaClient) Init(ctx context.Context) (_ *sftp.Client, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.sftpClient != nil { + return c.sftpClient, nil + } + + if c.User == "" { + return nil, fmt.Errorf("sftp user required") + } + + // Build SSH configuration & auth methods + config := &ssh.ClientConfig{ + User: c.User, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + BannerCallback: ssh.BannerDisplayStderr(), + } + if c.Password != "" { + config.Auth = append(config.Auth, ssh.Password(c.Password)) + } + + if c.KeyPath != "" { + buf, err := os.ReadFile(c.KeyPath) + if err != nil { + return nil, fmt.Errorf("cannot read sftp key path: %w", err) + } + + signer, err := ssh.ParsePrivateKey(buf) + if err != nil { + return nil, fmt.Errorf("cannot parse sftp key path: %w", err) + } + config.Auth = append(config.Auth, ssh.PublicKeys(signer)) + } + + // Append standard port, if necessary. + host := c.Host + if _, _, err := net.SplitHostPort(c.Host); err != nil { + host = net.JoinHostPort(c.Host, "22") + } + + // Connect via SSH. + if c.sshClient, err = ssh.Dial("tcp", host, config); err != nil { + return nil, err + } + + // Wrap connection with an SFTP client. + if c.sftpClient, err = sftp.NewClient(c.sshClient); err != nil { + c.sshClient.Close() + c.sshClient = nil + return nil, err + } + + return c.sftpClient, nil +} + +// Generations returns a list of available generation names. +func (c *ReplicaClient) Generations(ctx context.Context) (_ []string, err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return nil, err + } + + fis, err := sftpClient.ReadDir(litestream.GenerationsPath(c.Path)) + if os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + var generations []string + for _, fi := range fis { + if !fi.IsDir() { + continue + } + + name := path.Base(fi.Name()) + if !litestream.IsGenerationName(name) { + continue + } + generations = append(generations, name) + } + + sort.Strings(generations) + + return generations, nil +} + +// DeleteGeneration deletes all snapshots & WAL segments within a generation. +func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) (err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return err + } + + dir, err := litestream.GenerationPath(c.Path, generation) + if err != nil { + return fmt.Errorf("cannot determine generation path: %w", err) + } + + var dirs []string + walker := sftpClient.Walk(dir) + for walker.Step() { + if err := walker.Err(); err != nil { + return fmt.Errorf("cannot walk path %q: %w", walker.Path(), err) + } + if walker.Stat().IsDir() { + dirs = append(dirs, walker.Path()) + continue + } + + if err := sftpClient.Remove(walker.Path()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("cannot delete file %q: %w", walker.Path(), err) + } + + operationTotalCounterVec.WithLabelValues("DELETE").Inc() + } + + // Remove directories in reverse order after they have been emptied. + for i := len(dirs) - 1; i >= 0; i-- { + filename := dirs[i] + if err := sftpClient.RemoveDirectory(filename); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("cannot delete directory %q: %w", filename, err) + } + } + + // log.Printf("%s(%s): retainer: deleting generation: %s", r.db.Path(), r.Name(), generation) + + return nil +} + +// Snapshots returns an iterator over all available snapshots for a generation. +func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ litestream.SnapshotIterator, err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return nil, err + } + + dir, err := litestream.SnapshotsPath(c.Path, generation) + if err != nil { + return nil, fmt.Errorf("cannot determine snapshots path: %w", err) + } + + fis, err := sftpClient.ReadDir(dir) + if os.IsNotExist(err) { + return litestream.NewSnapshotInfoSliceIterator(nil), nil + } else if err != nil { + return nil, err + } + + // Iterate over every file and convert to metadata. + infos := make([]litestream.SnapshotInfo, 0, len(fis)) + for _, fi := range fis { + // Parse index from filename. + index, err := litestream.ParseSnapshotPath(path.Base(fi.Name())) + if err != nil { + continue + } + + infos = append(infos, litestream.SnapshotInfo{ + Generation: generation, + Index: index, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + }) + } + + sort.Sort(litestream.SnapshotInfoSlice(infos)) + + return litestream.NewSnapshotInfoSliceIterator(infos), nil +} + +// WriteSnapshot writes LZ4 compressed data from rd to the object storage. +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return info, err + } + + filename, err := litestream.SnapshotPath(c.Path, generation, index) + if err != nil { + return info, fmt.Errorf("cannot determine snapshot path: %w", err) + } + startTime := time.Now() + + if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { + return info, fmt.Errorf("cannot make parent wal segment directory %q: %w", path.Dir(filename), err) + } + + f, err := sftpClient.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) + if err != nil { + return info, fmt.Errorf("cannot open snapshot file for writing: %w", err) + } + defer f.Close() + + n, err := io.Copy(f, rd) + if err != nil { + return info, err + } else if err := f.Close(); err != nil { + return info, err + } + + operationTotalCounterVec.WithLabelValues("PUT").Inc() + operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n)) + + // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) + + return litestream.SnapshotInfo{ + Generation: generation, + Index: index, + Size: n, + CreatedAt: startTime.UTC(), + }, nil +} + +// SnapshotReader returns a reader for snapshot data at the given generation/index. +func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (_ io.ReadCloser, err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return nil, err + } + + filename, err := litestream.SnapshotPath(c.Path, generation, index) + if err != nil { + return nil, fmt.Errorf("cannot determine snapshot path: %w", err) + } + + f, err := sftpClient.Open(filename) + if err != nil { + return nil, err + } + + operationTotalCounterVec.WithLabelValues("GET").Inc() + + return f, nil +} + +// DeleteSnapshot deletes a snapshot with the given generation & index. +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) (err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return err + } + + filename, err := litestream.SnapshotPath(c.Path, generation, index) + if err != nil { + return fmt.Errorf("cannot determine snapshot path: %w", err) + } + + if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("cannot delete snapshot %q: %w", filename, err) + } + + operationTotalCounterVec.WithLabelValues("DELETE").Inc() + return nil +} + +// WALSegments returns an iterator over all available WAL files for a generation. +func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ litestream.WALSegmentIterator, err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return nil, err + } + + dir, err := litestream.WALPath(c.Path, generation) + if err != nil { + return nil, fmt.Errorf("cannot determine wal path: %w", err) + } + + fis, err := sftpClient.ReadDir(dir) + if os.IsNotExist(err) { + return litestream.NewWALSegmentInfoSliceIterator(nil), nil + } else if err != nil { + return nil, err + } + + // Iterate over every file and convert to metadata. + infos := make([]litestream.WALSegmentInfo, 0, len(fis)) + for _, fi := range fis { + index, offset, err := litestream.ParseWALSegmentPath(path.Base(fi.Name())) + if err != nil { + continue + } + + infos = append(infos, litestream.WALSegmentInfo{ + Generation: generation, + Index: index, + Offset: offset, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + }) + } + + sort.Sort(litestream.WALSegmentInfoSlice(infos)) + + return litestream.NewWALSegmentInfoSliceIterator(infos), nil +} + +// WriteWALSegment writes LZ4 compressed data from rd into a file on disk. +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return info, err + } + + filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + if err != nil { + return info, fmt.Errorf("cannot determine wal segment path: %w", err) + } + startTime := time.Now() + + if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { + return info, fmt.Errorf("cannot make parent snapshot directory %q: %w", path.Dir(filename), err) + } + + f, err := sftpClient.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) + if err != nil { + return info, fmt.Errorf("cannot open snapshot file for writing: %w", err) + } + defer f.Close() + + n, err := io.Copy(f, rd) + if err != nil { + return info, err + } else if err := f.Close(); err != nil { + return info, err + } + + operationTotalCounterVec.WithLabelValues("PUT").Inc() + operationBytesCounterVec.WithLabelValues("PUT").Add(float64(n)) + + return litestream.WALSegmentInfo{ + Generation: pos.Generation, + Index: pos.Index, + Offset: pos.Offset, + Size: n, + CreatedAt: startTime.UTC(), + }, nil +} + +// WALSegmentReader returns a reader for a section of WAL data at the given index. +// Returns os.ErrNotExist if no matching index/offset is found. +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (_ io.ReadCloser, err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return nil, err + } + + filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + if err != nil { + return nil, fmt.Errorf("cannot determine wal segment path: %w", err) + } + + f, err := sftpClient.Open(filename) + if err != nil { + return nil, err + } + + operationTotalCounterVec.WithLabelValues("GET").Inc() + + return f, nil +} + +// DeleteWALSegments deletes WAL segments with at the given positions. +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) (err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return err + } + + for _, pos := range a { + filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + if err != nil { + return fmt.Errorf("cannot determine wal segment path: %w", err) + } + + if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("cannot delete wal segment %q: %w", filename, err) + } + operationTotalCounterVec.WithLabelValues("DELETE").Inc() + } + + return nil +} + +// Cleanup deletes path & generations directories after empty. +func (c *ReplicaClient) Cleanup(ctx context.Context) (err error) { + defer func() { c.resetOnConnError(err) }() + + sftpClient, err := c.Init(ctx) + if err != nil { + return err + } + + if err := sftpClient.RemoveDirectory(litestream.GenerationsPath(c.Path)); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("cannot delete generations path: %w", err) + } else if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("cannot delete path: %w", err) + } + return nil +} + +// resetOnConnError closes & clears the client if a connection error occurs. +func (c *ReplicaClient) resetOnConnError(err error) { + if !errors.Is(err, sftp.ErrSSHFxConnectionLost) { + return + } + + if c.sftpClient != nil { + c.sftpClient.Close() + c.sftpClient = nil + } + if c.sshClient != nil { + c.sshClient.Close() + c.sshClient = nil + } +} + +// SFTP metrics. +var ( + operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "litestream_sftp_operation_total", + Help: "The number of SFTP operations performed", + }, []string{"type"}) + + operationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "litestream_sftp_operation_bytes", + Help: "The number of bytes used by SFTP operations", + }, []string{"type"}) +)