Use database page size in read replication
This commit is contained in:
2
db.go
2
db.go
@@ -677,8 +677,6 @@ func (db *DB) initReplica(pageSize int) (err error) {
|
|||||||
return fmt.Errorf("enable wal failed, mode=%q", mode)
|
return fmt.Errorf("enable wal failed, mode=%q", mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Set page size.
|
|
||||||
|
|
||||||
if _, err := db.db.ExecContext(db.ctx, `CREATE TABLE IF NOT EXISTS _litestream (id INTEGER)`); err != nil {
|
if _, err := db.db.ExecContext(db.ctx, `CREATE TABLE IF NOT EXISTS _litestream (id INTEGER)`); err != nil {
|
||||||
return fmt.Errorf("create _litestream table: %w", err)
|
return fmt.Errorf("create _litestream table: %w", err)
|
||||||
} else if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
} else if _, err := db.db.ExecContext(db.ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
||||||
|
|||||||
135
http/client.go
Normal file
135
http/client.go
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/benbjohnson/litestream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client represents an client for a streaming Litestream HTTP server.
|
||||||
|
type Client struct {
|
||||||
|
// Upstream endpoint
|
||||||
|
URL string
|
||||||
|
|
||||||
|
// Path of database on upstream server.
|
||||||
|
Path string
|
||||||
|
|
||||||
|
// Underlying HTTP client
|
||||||
|
HTTPClient *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient returns an instance of Client.
|
||||||
|
func NewClient(rawurl, path string) *Client {
|
||||||
|
return &Client{
|
||||||
|
URL: rawurl,
|
||||||
|
Path: path,
|
||||||
|
HTTPClient: http.DefaultClient,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream returns a snapshot and continuous stream of WAL updates.
|
||||||
|
func (c *Client) Stream(ctx context.Context) (litestream.StreamReader, error) {
|
||||||
|
u, err := url.Parse(c.URL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid client URL: %w", err)
|
||||||
|
} else if u.Scheme != "http" && u.Scheme != "https" {
|
||||||
|
return nil, fmt.Errorf("invalid URL scheme")
|
||||||
|
} else if u.Host == "" {
|
||||||
|
return nil, fmt.Errorf("URL host required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Strip off everything but the scheme & host.
|
||||||
|
*u = url.URL{
|
||||||
|
Scheme: u.Scheme,
|
||||||
|
Host: u.Host,
|
||||||
|
Path: "/stream",
|
||||||
|
RawQuery: (url.Values{
|
||||||
|
"path": []string{c.Path},
|
||||||
|
}).Encode(),
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
resp, err := c.HTTPClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if resp.StatusCode != http.StatusOK {
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil, fmt.Errorf("invalid response: code=%d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
pageSize, _ := strconv.Atoi(resp.Header.Get("Litestream-page-size"))
|
||||||
|
if pageSize <= 0 {
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil, fmt.Errorf("stream page size unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &StreamReader{
|
||||||
|
pageSize: pageSize,
|
||||||
|
rc: resp.Body,
|
||||||
|
lr: io.LimitedReader{R: resp.Body},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamReader represents an optional snapshot followed by a continuous stream
|
||||||
|
// of WAL updates. It is used to implement live read replication from a single
|
||||||
|
// primary Litestream server to one or more remote Litestream replicas.
|
||||||
|
type StreamReader struct {
|
||||||
|
pageSize int
|
||||||
|
rc io.ReadCloser
|
||||||
|
lr io.LimitedReader
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying reader.
|
||||||
|
func (r *StreamReader) Close() (err error) {
|
||||||
|
if e := r.rc.Close(); err == nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// PageSize returns the page size on the remote database.
|
||||||
|
func (r *StreamReader) PageSize() int { return r.pageSize }
|
||||||
|
|
||||||
|
// Read reads bytes of the current payload into p. Only valid after a successful
|
||||||
|
// call to Next(). On io.EOF, call Next() again to begin reading next record.
|
||||||
|
func (r *StreamReader) Read(p []byte) (n int, err error) {
|
||||||
|
return r.lr.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next returns the next available record. This call will block until a record
|
||||||
|
// is available. After calling Next(), read the payload from the reader using
|
||||||
|
// Read() until io.EOF is reached.
|
||||||
|
func (r *StreamReader) Next() (*litestream.StreamRecordHeader, error) {
|
||||||
|
// If bytes remain on the current file, discard.
|
||||||
|
if r.lr.N > 0 {
|
||||||
|
if _, err := io.Copy(io.Discard, &r.lr); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read record header.
|
||||||
|
buf := make([]byte, litestream.StreamRecordHeaderSize)
|
||||||
|
if _, err := io.ReadFull(r.rc, buf); err != nil {
|
||||||
|
return nil, fmt.Errorf("http.StreamReader.Next(): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var hdr litestream.StreamRecordHeader
|
||||||
|
if err := hdr.UnmarshalBinary(buf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update remaining bytes on file reader.
|
||||||
|
r.lr.N = hdr.Size
|
||||||
|
|
||||||
|
return &hdr, nil
|
||||||
|
}
|
||||||
@@ -1,15 +1,14 @@
|
|||||||
package http
|
package http
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
httppprof "net/http/pprof"
|
httppprof "net/http/pprof"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
@@ -142,6 +141,9 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the page size in the header.
|
||||||
|
w.Header().Set("Litestream-page-size", strconv.Itoa(db.PageSize()))
|
||||||
|
|
||||||
// TODO: Restart stream from a previous position, if specified.
|
// TODO: Restart stream from a previous position, if specified.
|
||||||
|
|
||||||
// Determine starting position.
|
// Determine starting position.
|
||||||
@@ -260,114 +262,3 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
// Upstream endpoint
|
|
||||||
URL string
|
|
||||||
|
|
||||||
// Path of database on upstream server.
|
|
||||||
Path string
|
|
||||||
|
|
||||||
// Underlying HTTP client
|
|
||||||
HTTPClient *http.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewClient(rawurl, path string) *Client {
|
|
||||||
return &Client{
|
|
||||||
URL: rawurl,
|
|
||||||
Path: path,
|
|
||||||
HTTPClient: http.DefaultClient,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Stream(ctx context.Context) (litestream.StreamReader, error) {
|
|
||||||
u, err := url.Parse(c.URL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid client URL: %w", err)
|
|
||||||
} else if u.Scheme != "http" && u.Scheme != "https" {
|
|
||||||
return nil, fmt.Errorf("invalid URL scheme")
|
|
||||||
} else if u.Host == "" {
|
|
||||||
return nil, fmt.Errorf("URL host required")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Strip off everything but the scheme & host.
|
|
||||||
*u = url.URL{
|
|
||||||
Scheme: u.Scheme,
|
|
||||||
Host: u.Host,
|
|
||||||
Path: "/stream",
|
|
||||||
RawQuery: (url.Values{
|
|
||||||
"path": []string{c.Path},
|
|
||||||
}).Encode(),
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", u.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req = req.WithContext(ctx)
|
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else if resp.StatusCode != http.StatusOK {
|
|
||||||
resp.Body.Close()
|
|
||||||
return nil, fmt.Errorf("invalid response: code=%d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &StreamReader{
|
|
||||||
body: resp.Body,
|
|
||||||
file: io.LimitedReader{R: resp.Body},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type StreamReader struct {
|
|
||||||
body io.ReadCloser
|
|
||||||
file io.LimitedReader
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *StreamReader) Close() error {
|
|
||||||
if e := r.body.Close(); e != nil && r.err == nil {
|
|
||||||
r.err = e
|
|
||||||
}
|
|
||||||
return r.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *StreamReader) Read(p []byte) (int, error) {
|
|
||||||
if r.err != nil {
|
|
||||||
return 0, r.err
|
|
||||||
} else if r.file.R == nil {
|
|
||||||
return 0, io.EOF
|
|
||||||
}
|
|
||||||
return r.file.Read(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *StreamReader) Next() (*litestream.StreamRecordHeader, error) {
|
|
||||||
if r.err != nil {
|
|
||||||
return nil, r.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// If bytes remain on the current file, discard.
|
|
||||||
if r.file.N > 0 {
|
|
||||||
if _, r.err = io.Copy(io.Discard, &r.file); r.err != nil {
|
|
||||||
return nil, r.err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read record header.
|
|
||||||
buf := make([]byte, litestream.StreamRecordHeaderSize)
|
|
||||||
if _, err := io.ReadFull(r.body, buf); err != nil {
|
|
||||||
r.err = fmt.Errorf("http.StreamReader.Next(): %w", err)
|
|
||||||
return nil, r.err
|
|
||||||
}
|
|
||||||
|
|
||||||
var hdr litestream.StreamRecordHeader
|
|
||||||
if r.err = hdr.UnmarshalBinary(buf); r.err != nil {
|
|
||||||
return nil, r.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update remaining bytes on file reader.
|
|
||||||
r.file.N = hdr.Size
|
|
||||||
|
|
||||||
return &hdr, nil
|
|
||||||
}
|
|
||||||
@@ -530,6 +530,7 @@ type StreamClient interface {
|
|||||||
// StreamReader represents a reader that streams snapshot and WAL records.
|
// StreamReader represents a reader that streams snapshot and WAL records.
|
||||||
type StreamReader interface {
|
type StreamReader interface {
|
||||||
io.ReadCloser
|
io.ReadCloser
|
||||||
|
PageSize() int
|
||||||
Next() (*StreamRecordHeader, error)
|
Next() (*StreamRecordHeader, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user