Allow read replication recovery from last position

This commit is contained in:
Ben Johnson
2022-04-03 09:18:54 -06:00
parent 2c3e28c786
commit 44662022fa
8 changed files with 361 additions and 56 deletions

View File

@@ -128,13 +128,25 @@ func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
// TODO: Listen for all databases matching query criteria.
path := q.Get("path")
if path == "" {
s.writeError(w, r, "Database name required", http.StatusBadRequest)
return
}
// Parse current client position, if available.
var pos litestream.Pos
if generation, index := q.Get("generation"), q.Get("index"); generation != "" && index != "" {
pos.Generation = generation
var err error
if pos.Index, err = litestream.ParseIndex(index); err != nil {
s.writeError(w, r, "Invalid index query parameter", http.StatusBadRequest)
return
}
}
// Fetch database instance from the primary server.
db := s.server.DB(path)
if db == nil {
s.writeError(w, r, "Database not found", http.StatusNotFound)
@@ -144,70 +156,91 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
// Set the page size in the header.
w.Header().Set("Litestream-page-size", strconv.Itoa(db.PageSize()))
// TODO: Restart stream from a previous position, if specified.
// Determine starting position.
pos := db.Pos()
if pos.Generation == "" {
dbPos := db.Pos()
if dbPos.Generation == "" {
s.writeError(w, r, "No generation available", http.StatusServiceUnavailable)
return
}
pos.Offset = 0
dbPos.Offset = 0
s.Logger.Printf("stream connected @ %s", pos)
defer s.Logger.Printf("stream disconnected")
// Use database position if generation has changed.
var snapshotRequired bool
if pos.Generation != dbPos.Generation {
s.Logger.Printf("stream generation mismatch, using primary position: client.pos=%s", pos)
pos, snapshotRequired = dbPos, true
}
// Obtain iterator before snapshot so we don't miss any WAL segments.
itr, err := db.WALSegments(r.Context(), pos.Generation)
fitr, err := db.WALSegments(r.Context(), pos.Generation)
if err != nil {
s.writeError(w, r, fmt.Sprintf("Cannot obtain WAL iterator: %s", err), http.StatusInternalServerError)
return
}
defer itr.Close()
defer fitr.Close()
// Write snapshot to response body.
if err := db.WithFile(func(f *os.File) error {
fi, err := f.Stat()
if err != nil {
return err
}
bitr := litestream.NewBufferedWALSegmentIterator(fitr)
// Write snapshot header with current position & size.
hdr := litestream.StreamRecordHeader{
Type: litestream.StreamRecordTypeSnapshot,
Generation: pos.Generation,
Index: pos.Index,
Size: fi.Size(),
}
if buf, err := hdr.MarshalBinary(); err != nil {
return fmt.Errorf("marshal snapshot stream record header: %w", err)
} else if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write snapshot stream record header: %w", err)
}
if _, err := io.CopyN(w, f, fi.Size()); err != nil {
return fmt.Errorf("copy snapshot: %w", err)
}
return nil
}); err != nil {
s.writeError(w, r, err.Error(), http.StatusInternalServerError)
// Peek at first position to see if client is too old.
if info, ok := bitr.Peek(); !ok {
s.writeError(w, r, "cannot peek WAL iterator, no segments available", http.StatusInternalServerError)
return
} else if cmp, err := litestream.ComparePos(pos, info.Pos()); err != nil {
s.writeError(w, r, fmt.Sprintf("cannot compare pos: %s", err), http.StatusInternalServerError)
return
} else if cmp == -1 {
s.Logger.Printf("stream position no longer available, using using primary position: client.pos=%s", pos)
pos, snapshotRequired = dbPos, true
}
// Flush after snapshot has been written.
w.(http.Flusher).Flush()
s.Logger.Printf("stream connected: pos=%s snapshot=%v", pos, snapshotRequired)
defer s.Logger.Printf("stream disconnected")
// Write snapshot to response body.
if snapshotRequired {
if err := db.WithFile(func(f *os.File) error {
fi, err := f.Stat()
if err != nil {
return err
}
// Write snapshot header with current position & size.
hdr := litestream.StreamRecordHeader{
Type: litestream.StreamRecordTypeSnapshot,
Generation: pos.Generation,
Index: pos.Index,
Size: fi.Size(),
}
if buf, err := hdr.MarshalBinary(); err != nil {
return fmt.Errorf("marshal snapshot stream record header: %w", err)
} else if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write snapshot stream record header: %w", err)
}
if _, err := io.CopyN(w, f, fi.Size()); err != nil {
return fmt.Errorf("copy snapshot: %w", err)
}
return nil
}); err != nil {
s.writeError(w, r, err.Error(), http.StatusInternalServerError)
return
}
// Flush after snapshot has been written.
w.(http.Flusher).Flush()
}
for {
// Wait for notification of new entries.
select {
case <-r.Context().Done():
return
case <-itr.NotifyCh():
case <-fitr.NotifyCh():
}
for itr.Next() {
info := itr.WALSegment()
for bitr.Next() {
info := bitr.WALSegment()
// Skip any segments before our initial position.
if cmp, err := litestream.ComparePos(info.Pos(), pos); err != nil {
@@ -256,7 +289,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
// Flush after WAL segment has been written.
w.(http.Flusher).Flush()
}
if itr.Err() != nil {
if bitr.Err() != nil {
s.Logger.Printf("wal iterator error: %s", err)
return
}