diff --git a/db.go b/db.go index a2396ec..c1ac7da 100644 --- a/db.go +++ b/db.go @@ -677,8 +677,6 @@ func (db *DB) initReplica(pageSize int) (err error) { return fmt.Errorf("enable wal failed, mode=%q", mode) } - // TODO: Set page size. - if _, err := db.db.ExecContext(db.ctx, `CREATE TABLE IF NOT EXISTS _litestream (id INTEGER)`); err != nil { return fmt.Errorf("create _litestream table: %w", err) } else if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil { diff --git a/http/client.go b/http/client.go new file mode 100644 index 0000000..5c5ae75 --- /dev/null +++ b/http/client.go @@ -0,0 +1,135 @@ +package http + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + + "github.com/benbjohnson/litestream" +) + +// Client represents an client for a streaming Litestream HTTP server. +type Client struct { + // Upstream endpoint + URL string + + // Path of database on upstream server. + Path string + + // Underlying HTTP client + HTTPClient *http.Client +} + +// NewClient returns an instance of Client. +func NewClient(rawurl, path string) *Client { + return &Client{ + URL: rawurl, + Path: path, + HTTPClient: http.DefaultClient, + } +} + +// Stream returns a snapshot and continuous stream of WAL updates. +func (c *Client) Stream(ctx context.Context) (litestream.StreamReader, error) { + u, err := url.Parse(c.URL) + if err != nil { + return nil, fmt.Errorf("invalid client URL: %w", err) + } else if u.Scheme != "http" && u.Scheme != "https" { + return nil, fmt.Errorf("invalid URL scheme") + } else if u.Host == "" { + return nil, fmt.Errorf("URL host required") + } + + // Strip off everything but the scheme & host. + *u = url.URL{ + Scheme: u.Scheme, + Host: u.Host, + Path: "/stream", + RawQuery: (url.Values{ + "path": []string{c.Path}, + }).Encode(), + } + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, err + } else if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("invalid response: code=%d", resp.StatusCode) + } + + pageSize, _ := strconv.Atoi(resp.Header.Get("Litestream-page-size")) + if pageSize <= 0 { + resp.Body.Close() + return nil, fmt.Errorf("stream page size unavailable") + } + + return &StreamReader{ + pageSize: pageSize, + rc: resp.Body, + lr: io.LimitedReader{R: resp.Body}, + }, nil +} + +// StreamReader represents an optional snapshot followed by a continuous stream +// of WAL updates. It is used to implement live read replication from a single +// primary Litestream server to one or more remote Litestream replicas. +type StreamReader struct { + pageSize int + rc io.ReadCloser + lr io.LimitedReader +} + +// Close closes the underlying reader. +func (r *StreamReader) Close() (err error) { + if e := r.rc.Close(); err == nil { + err = e + } + return err +} + +// PageSize returns the page size on the remote database. +func (r *StreamReader) PageSize() int { return r.pageSize } + +// Read reads bytes of the current payload into p. Only valid after a successful +// call to Next(). On io.EOF, call Next() again to begin reading next record. +func (r *StreamReader) Read(p []byte) (n int, err error) { + return r.lr.Read(p) +} + +// Next returns the next available record. This call will block until a record +// is available. After calling Next(), read the payload from the reader using +// Read() until io.EOF is reached. +func (r *StreamReader) Next() (*litestream.StreamRecordHeader, error) { + // If bytes remain on the current file, discard. + if r.lr.N > 0 { + if _, err := io.Copy(io.Discard, &r.lr); err != nil { + return nil, err + } + } + + // Read record header. + buf := make([]byte, litestream.StreamRecordHeaderSize) + if _, err := io.ReadFull(r.rc, buf); err != nil { + return nil, fmt.Errorf("http.StreamReader.Next(): %w", err) + } + + var hdr litestream.StreamRecordHeader + if err := hdr.UnmarshalBinary(buf); err != nil { + return nil, err + } + + // Update remaining bytes on file reader. + r.lr.N = hdr.Size + + return &hdr, nil +} diff --git a/http/http.go b/http/server.go similarity index 71% rename from http/http.go rename to http/server.go index fc35f0f..9763c73 100644 --- a/http/http.go +++ b/http/server.go @@ -1,15 +1,14 @@ package http import ( - "context" "fmt" "io" "log" "net" "net/http" httppprof "net/http/pprof" - "net/url" "os" + "strconv" "strings" "github.com/benbjohnson/litestream" @@ -142,6 +141,9 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) { return } + // Set the page size in the header. + w.Header().Set("Litestream-page-size", strconv.Itoa(db.PageSize())) + // TODO: Restart stream from a previous position, if specified. // Determine starting position. @@ -260,114 +262,3 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) { } } } - -type Client struct { - // Upstream endpoint - URL string - - // Path of database on upstream server. - Path string - - // Underlying HTTP client - HTTPClient *http.Client -} - -func NewClient(rawurl, path string) *Client { - return &Client{ - URL: rawurl, - Path: path, - HTTPClient: http.DefaultClient, - } -} - -func (c *Client) Stream(ctx context.Context) (litestream.StreamReader, error) { - u, err := url.Parse(c.URL) - if err != nil { - return nil, fmt.Errorf("invalid client URL: %w", err) - } else if u.Scheme != "http" && u.Scheme != "https" { - return nil, fmt.Errorf("invalid URL scheme") - } else if u.Host == "" { - return nil, fmt.Errorf("URL host required") - } - - // Strip off everything but the scheme & host. - *u = url.URL{ - Scheme: u.Scheme, - Host: u.Host, - Path: "/stream", - RawQuery: (url.Values{ - "path": []string{c.Path}, - }).Encode(), - } - - req, err := http.NewRequest("GET", u.String(), nil) - if err != nil { - return nil, err - } - req = req.WithContext(ctx) - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return nil, err - } else if resp.StatusCode != http.StatusOK { - resp.Body.Close() - return nil, fmt.Errorf("invalid response: code=%d", resp.StatusCode) - } - - return &StreamReader{ - body: resp.Body, - file: io.LimitedReader{R: resp.Body}, - }, nil -} - -type StreamReader struct { - body io.ReadCloser - file io.LimitedReader - err error -} - -func (r *StreamReader) Close() error { - if e := r.body.Close(); e != nil && r.err == nil { - r.err = e - } - return r.err -} - -func (r *StreamReader) Read(p []byte) (int, error) { - if r.err != nil { - return 0, r.err - } else if r.file.R == nil { - return 0, io.EOF - } - return r.file.Read(p) -} - -func (r *StreamReader) Next() (*litestream.StreamRecordHeader, error) { - if r.err != nil { - return nil, r.err - } - - // If bytes remain on the current file, discard. - if r.file.N > 0 { - if _, r.err = io.Copy(io.Discard, &r.file); r.err != nil { - return nil, r.err - } - } - - // Read record header. - buf := make([]byte, litestream.StreamRecordHeaderSize) - if _, err := io.ReadFull(r.body, buf); err != nil { - r.err = fmt.Errorf("http.StreamReader.Next(): %w", err) - return nil, r.err - } - - var hdr litestream.StreamRecordHeader - if r.err = hdr.UnmarshalBinary(buf); r.err != nil { - return nil, r.err - } - - // Update remaining bytes on file reader. - r.file.N = hdr.Size - - return &hdr, nil -} diff --git a/litestream.go b/litestream.go index 9bb945e..a367b33 100644 --- a/litestream.go +++ b/litestream.go @@ -530,6 +530,7 @@ type StreamClient interface { // StreamReader represents a reader that streams snapshot and WAL records. type StreamReader interface { io.ReadCloser + PageSize() int Next() (*StreamRecordHeader, error) }