Compare commits

..

8 Commits

Author SHA1 Message Date
Ben Johnson
c53a09c124 Fix read replication stream restart position 2022-06-15 14:35:17 -06:00
Hiroaki Nakamura
46597ab22f Fix wal internal error log 2022-05-13 15:25:43 -06:00
Hiroaki Nakamura
e6f7c6052d Add two environments for overriding endpoint and region
export LITESTREAM_ACCESS_KEY_ID=your_key_id
export LITESTREAM_SECRET_ACCESS_KEY=your_access_key
export LITESTREAM_ENDPOINT=your_endpoint
export LITESTREAM_REGION=your_region
litestream replicate fruits.db s3://mybkt/fruits.db
2022-05-10 08:48:48 -06:00
Ben Johnson
7d8b8c6ec0 Remove verbose flag from restore docs 2022-05-03 06:33:53 -07:00
Michael Lynch
88737d7164 Add a unit test for internal.MD5Hash 2022-04-17 15:02:37 -06:00
Michael Lynch
6763e9218c Fix path to coverage file 2022-04-17 15:02:32 -06:00
Michael Lynch
301e1172fd Add Go code coverage to CI 2022-04-17 15:02:32 -06:00
Ben Johnson
ca07137d32 Re-add point-in-time restore 2022-04-14 20:03:52 -06:00
39 changed files with 461 additions and 116 deletions

View File

@@ -22,4 +22,9 @@ jobs:
run: go install ./cmd/litestream run: go install ./cmd/litestream
- name: Run unit tests - name: Run unit tests
run: make testdata && go test -v ./... run: make testdata && go test -v --coverprofile=.coverage.out ./... && go tool cover -html .coverage.out -o .coverage.html
- uses: actions/upload-artifact@v3
with:
name: code-coverage
path: .coverage.html

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.coverage.*
.DS_Store .DS_Store
/dist /dist

View File

@@ -9,6 +9,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"time"
"github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream"
) )
@@ -28,6 +29,7 @@ type RestoreCommand struct {
replicaName string // optional, name of replica to restore from replicaName string // optional, name of replica to restore from
generation string // optional, generation to restore generation string // optional, generation to restore
targetIndex int // optional, last WAL index to replay targetIndex int // optional, last WAL index to replay
timestamp time.Time // optional, restore to point-in-time (ISO 8601)
ifDBNotExists bool // if true, skips restore if output path already exists ifDBNotExists bool // if true, skips restore if output path already exists
ifReplicaExists bool // if true, skips if no backups exist ifReplicaExists bool // if true, skips if no backups exist
opt litestream.RestoreOptions opt litestream.RestoreOptions
@@ -53,6 +55,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
fs.StringVar(&c.replicaName, "replica", "", "replica name") fs.StringVar(&c.replicaName, "replica", "", "replica name")
fs.StringVar(&c.generation, "generation", "", "generation name") fs.StringVar(&c.generation, "generation", "", "generation name")
fs.Var((*indexVar)(&c.targetIndex), "index", "wal index") fs.Var((*indexVar)(&c.targetIndex), "index", "wal index")
timestampStr := fs.String("timestamp", "", "point-in-time restore (ISO 8601)")
fs.IntVar(&c.opt.Parallelism, "parallelism", c.opt.Parallelism, "parallelism") fs.IntVar(&c.opt.Parallelism, "parallelism", c.opt.Parallelism, "parallelism")
fs.BoolVar(&c.ifDBNotExists, "if-db-not-exists", false, "") fs.BoolVar(&c.ifDBNotExists, "if-db-not-exists", false, "")
fs.BoolVar(&c.ifReplicaExists, "if-replica-exists", false, "") fs.BoolVar(&c.ifReplicaExists, "if-replica-exists", false, "")
@@ -66,9 +69,20 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
} }
pathOrURL := fs.Arg(0) pathOrURL := fs.Arg(0)
// Parse timestamp.
if *timestampStr != "" {
if c.timestamp, err = time.Parse(time.RFC3339Nano, *timestampStr); err != nil {
return fmt.Errorf("invalid -timestamp, expected ISO 8601: %w", err)
}
}
// Ensure a generation is specified if target index is specified. // Ensure a generation is specified if target index is specified.
if c.targetIndex != -1 && c.generation == "" { if c.targetIndex != -1 && !c.timestamp.IsZero() {
return fmt.Errorf("cannot specify both -index flag and -timestamp flag")
} else if c.targetIndex != -1 && c.generation == "" {
return fmt.Errorf("must specify -generation flag when using -index flag") return fmt.Errorf("must specify -generation flag when using -index flag")
} else if !c.timestamp.IsZero() && c.generation == "" {
return fmt.Errorf("must specify -generation flag when using -timestamp flag")
} }
// Default to original database path if output path not specified. // Default to original database path if output path not specified.
@@ -117,7 +131,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
} }
// Determine the maximum available index for the generation if one is not specified. // Determine the maximum available index for the generation if one is not specified.
if c.targetIndex == -1 { if !c.timestamp.IsZero() {
if c.targetIndex, err = litestream.FindIndexByTimestamp(ctx, r.Client(), c.generation, c.timestamp); err != nil {
return fmt.Errorf("cannot find index for timestamp in generation %q: %w", c.generation, err)
}
} else if c.targetIndex == -1 {
if c.targetIndex, err = litestream.FindMaxIndexByGeneration(ctx, r.Client(), c.generation); err != nil { if c.targetIndex, err = litestream.FindMaxIndexByGeneration(ctx, r.Client(), c.generation); err != nil {
return fmt.Errorf("cannot determine latest index in generation %q: %w", c.generation, err) return fmt.Errorf("cannot determine latest index in generation %q: %w", c.generation, err)
} }
@@ -239,6 +257,10 @@ Arguments:
Restore up to a specific hex-encoded WAL index (inclusive). Restore up to a specific hex-encoded WAL index (inclusive).
Defaults to use the highest available index. Defaults to use the highest available index.
-timestamp DATETIME
Restore up to a specific point-in-time. Must be ISO 8601.
Cannot be specified with -index flag.
-o PATH -o PATH
Output path of the restored database. Output path of the restored database.
Defaults to original DB path. Defaults to original DB path.
@@ -253,9 +275,6 @@ Arguments:
Determines the number of WAL files downloaded in parallel. Determines the number of WAL files downloaded in parallel.
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`. Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
-v
Verbose output.
Examples: Examples:
@@ -271,6 +290,9 @@ Examples:
# Restore database from specific generation on S3. # Restore database from specific generation on S3.
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db $ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
# Restore database to a specific point in time.
$ litestream restore -generation xxxxxxxx -timestamp 2000-01-01T00:00:00Z /path/to/db
`[1:], `[1:],
DefaultConfigPath(), DefaultConfigPath(),
) )

177
db.go
View File

@@ -17,6 +17,7 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
"github.com/benbjohnson/litestream/internal" "github.com/benbjohnson/litestream/internal"
@@ -1747,23 +1748,34 @@ func (db *DB) stream(ctx context.Context) error {
// streamSnapshot reads the snapshot into the WAL and applies it to the main database. // streamSnapshot reads the snapshot into the WAL and applies it to the main database.
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error { func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
// Truncate WAL file.
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
return fmt.Errorf("truncate: %w", err)
}
// Determine total page count. // Determine total page count.
pageN := int(hdr.Size / int64(db.pageSize)) pageN := int(hdr.Size / int64(db.pageSize))
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize) // Open database file.
if err := ww.Open(); err != nil { f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
return fmt.Errorf("open wal writer: %w", err) if err != nil {
return fmt.Errorf("open db file: %w", err)
} }
defer func() { _ = ww.Close() }() defer f.Close()
if err := ww.WriteHeader(); err != nil { // Open shm file for locking.
return fmt.Errorf("write wal header: %w", err) shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("open shm file: %w", err)
} }
defer shmFile.Close()
// Obtain WAL checkpoint lock.
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
}
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
// Obtain WAL write lock.
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
return fmt.Errorf("cannot obtain wal write lock: %w", err)
}
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
// Iterate over pages // Iterate over pages
buf := make([]byte, db.pageSize) buf := make([]byte, db.pageSize)
@@ -1775,26 +1787,18 @@ func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.
return fmt.Errorf("read snapshot page %d: %w", pgno, err) return fmt.Errorf("read snapshot page %d: %w", pgno, err)
} }
// Issue a commit flag when the last page is reached. // Copy page to database file.
var commit uint32 offset := int64(pgno-1) * int64(db.pageSize)
if _, err := f.WriteAt(buf, offset); err != nil {
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
}
// Truncate database to final size.
if pgno == uint32(pageN) { if pgno == uint32(pageN) {
commit = uint32(pageN) if err := f.Truncate(int64(pageN) * int64(db.pageSize)); err != nil {
} return fmt.Errorf("truncate db: commit=%d err=%w", pageN, err)
// Write page into WAL frame.
if err := ww.WriteFrame(pgno, commit, buf); err != nil {
return fmt.Errorf("write wal frame: %w", err)
} }
} }
// Close WAL file writer.
if err := ww.Close(); err != nil {
return fmt.Errorf("close wal writer: %w", err)
}
// Invalidate WAL index.
if err := invalidateSHMFile(db.path); err != nil {
return fmt.Errorf("invalidate shm file: %w", err)
} }
// Write position to file so other processes can read it. // Write position to file so other processes can read it.
@@ -1819,44 +1823,63 @@ func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r i
} }
} }
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize) // Open database file.
if err := ww.Open(); err != nil { f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
return fmt.Errorf("open wal writer: %w", err) if err != nil {
return fmt.Errorf("open db file: %w", err)
} }
defer func() { _ = ww.Close() }() defer f.Close()
if err := ww.WriteHeader(); err != nil { // Open shm file for locking.
return fmt.Errorf("write wal header: %w", err) shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("open shm file: %w", err)
} }
defer shmFile.Close()
// Obtain WAL checkpoint lock.
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
}
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
// Obtain WAL write lock.
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
return fmt.Errorf("cannot obtain wal write lock: %w", err)
}
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
// Iterate over incoming WAL pages. // Iterate over incoming WAL pages.
buf := make([]byte, WALFrameHeaderSize+db.pageSize) buf := make([]byte, WALFrameHeaderSize+db.pageSize)
for i := 0; ; i++ { for i := 0; ; i++ {
// Read snapshot page into a buffer. // Read snapshot page into a buffer.
if _, err := io.ReadFull(zr, buf); err == io.EOF { if n, err := io.ReadFull(zr, buf); err == io.EOF {
break break
} else if err != nil { } else if err != nil {
return fmt.Errorf("read wal frame %d: %w", i, err) return fmt.Errorf("read wal frame: i=%d n=%d err=%w", i, n, err)
} }
// Read page number & commit field. // Read page number & commit field.
pgno := binary.BigEndian.Uint32(buf[0:]) pgno := binary.BigEndian.Uint32(buf[0:])
commit := binary.BigEndian.Uint32(buf[4:]) commit := binary.BigEndian.Uint32(buf[4:])
// Write page into WAL frame. // Copy page to database file.
if err := ww.WriteFrame(pgno, commit, buf[WALFrameHeaderSize:]); err != nil { offset := int64(pgno-1) * int64(db.pageSize)
return fmt.Errorf("write wal frame: %w", err) if _, err := f.WriteAt(buf[WALFrameHeaderSize:], offset); err != nil {
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
}
// Truncate database, if commit specified.
if commit != 0 {
if err := f.Truncate(int64(commit) * int64(db.pageSize)); err != nil {
return fmt.Errorf("truncate db: commit=%d err=%w", commit, err)
}
} }
} }
// Close WAL file writer. // Close database file writer.
if err := ww.Close(); err != nil { if err := f.Close(); err != nil {
return fmt.Errorf("close wal writer: %w", err) return fmt.Errorf("close db writer: %w", err)
}
// Invalidate WAL index.
if err := invalidateSHMFile(db.path); err != nil {
return fmt.Errorf("invalidate shm file: %w", err)
} }
// Write position to file so other processes can read it. // Write position to file so other processes can read it.
@@ -2016,51 +2039,21 @@ func logPrefixPath(path string) string {
return path return path
} }
// invalidateSHMFile clears the iVersion field of the -shm file in order that
// the next transaction will rebuild it.
func invalidateSHMFile(dbPath string) error {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return fmt.Errorf("reopen db: %w", err)
}
defer func() { _ = db.Close() }()
if _, err := db.Exec(`PRAGMA wal_checkpoint(PASSIVE)`); err != nil {
return fmt.Errorf("passive checkpoint: %w", err)
}
f, err := os.OpenFile(dbPath+"-shm", os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("open shm index: %w", err)
}
defer f.Close()
buf := make([]byte, WALIndexHeaderSize)
if _, err := io.ReadFull(f, buf); err != nil {
return fmt.Errorf("read shm index: %w", err)
}
// Invalidate "isInit" fields.
buf[12], buf[60] = 0, 0
// Rewrite header.
if _, err := f.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek shm index: %w", err)
} else if _, err := f.Write(buf); err != nil {
return fmt.Errorf("overwrite shm index: %w", err)
} else if err := f.Close(); err != nil {
return fmt.Errorf("close shm index: %w", err)
}
// Truncate WAL file again.
var row [3]int
if err := db.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE)`).Scan(&row[0], &row[1], &row[2]); err != nil {
return fmt.Errorf("truncate: %w", err)
}
return nil
}
// A marker error to indicate that a restart checkpoint could not verify // A marker error to indicate that a restart checkpoint could not verify
// continuity between WAL indices and a new generation should be started. // continuity between WAL indices and a new generation should be started.
var errRestartGeneration = errors.New("restart generation") var errRestartGeneration = errors.New("restart generation")
const (
WAL_WRITE_LOCK_OFFSET = 120
WAL_CKPT_LOCK_OFFSET = 121
)
// setLkw is a helper function for calling fcntl for file locking.
func setLkw(f *os.File, typ int16, start, len int64) error {
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &syscall.Flock_t{
Start: start,
Len: len,
Type: typ,
Whence: io.SeekStart,
})
}

View File

@@ -48,6 +48,7 @@ func (c *Client) Stream(ctx context.Context, pos litestream.Pos) (litestream.Str
if !pos.IsZero() { if !pos.IsZero() {
q.Set("generation", pos.Generation) q.Set("generation", pos.Generation)
q.Set("index", litestream.FormatIndex(pos.Index)) q.Set("index", litestream.FormatIndex(pos.Index))
q.Set("offset", litestream.FormatOffset(pos.Offset))
} }
// Strip off everything but the scheme & host. // Strip off everything but the scheme & host.

View File

@@ -134,16 +134,31 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
return return
} }
generationStr := q.Get("generation")
indexStr := q.Get("index")
offsetStr := q.Get("offset")
// Parse current client position, if available. // Parse current client position, if available.
var pos litestream.Pos var pos litestream.Pos
if generation, index := q.Get("generation"), q.Get("index"); generation != "" && index != "" { if generationStr != "" && indexStr != "" && offsetStr != "" {
pos.Generation = generation
var err error index, err := litestream.ParseIndex(indexStr)
if pos.Index, err = litestream.ParseIndex(index); err != nil { if err != nil {
s.writeError(w, r, "Invalid index query parameter", http.StatusBadRequest) s.writeError(w, r, "Invalid index query parameter", http.StatusBadRequest)
return return
} }
offset, err := litestream.ParseOffset(offsetStr)
if err != nil {
s.writeError(w, r, "Invalid offset query parameter", http.StatusBadRequest)
return
}
pos = litestream.Pos{
Generation: generationStr,
Index: index,
Offset: offset,
}
} }
// Fetch database instance from the primary server. // Fetch database instance from the primary server.
@@ -162,7 +177,6 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
s.writeError(w, r, "No generation available", http.StatusServiceUnavailable) s.writeError(w, r, "No generation available", http.StatusServiceUnavailable)
return return
} }
dbPos.Offset = 0
// Use database position if generation has changed. // Use database position if generation has changed.
var snapshotRequired bool var snapshotRequired bool
@@ -170,6 +184,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
s.Logger.Printf("stream generation mismatch, using primary position: client.pos=%s", pos) s.Logger.Printf("stream generation mismatch, using primary position: client.pos=%s", pos)
pos, snapshotRequired = dbPos, true pos, snapshotRequired = dbPos, true
} }
pos.Offset = 0
// Obtain iterator before snapshot so we don't miss any WAL segments. // Obtain iterator before snapshot so we don't miss any WAL segments.
fitr, err := db.WALSegments(r.Context(), pos.Generation) fitr, err := db.WALSegments(r.Context(), pos.Generation)
@@ -289,7 +304,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
// Flush after WAL segment has been written. // Flush after WAL segment has been written.
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
if bitr.Err() != nil { if err := bitr.Err(); err != nil {
s.Logger.Printf("wal iterator error: %s", err) s.Logger.Printf("wal iterator error: %s", err)
return return
} }

View File

@@ -481,7 +481,7 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil { } else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil {
t.Fatal(err) t.Fatal(err)
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil { } else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY, DATA TEXT)`); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer db0.Close() defer db0.Close()
@@ -489,8 +489,8 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
var index int var index int
insertAndWait := func() { insertAndWait := func() {
index++ index++
t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", index) t.Logf("[exec] INSERT INTO t (id, data) VALUES (%d, '...')", index)
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, index); err != nil { if _, err := db0.ExecContext(ctx, `INSERT INTO t (id, data) VALUES (?, ?)`, index, strings.Repeat("x", 512)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)

View File

@@ -102,6 +102,24 @@ func TestTruncateDuration(t *testing.T) {
} }
} }
func TestMD5Hash(t *testing.T) {
for _, tt := range []struct {
input []byte
output string
}{
{[]byte{}, "d41d8cd98f00b204e9800998ecf8427e"},
{[]byte{0x0}, "93b885adfe0da089cdf634904fd59f71"},
{[]byte{0x0, 0x1, 0x2, 0x3}, "37b59afd592725f9305e484a5d7f5168"},
{[]byte("Hello, world!"), "6cd3556deb0da54bca060b4c39479839"},
} {
t.Run(fmt.Sprintf("%v", tt.input), func(t *testing.T) {
if got, want := internal.MD5Hash(tt.input), tt.output; got != want {
t.Fatalf("hash=%s, want %s", got, want)
}
})
}
}
func TestOnceCloser(t *testing.T) { func TestOnceCloser(t *testing.T) {
var closed bool var closed bool
var rc = &mock.ReadCloser{ var rc = &mock.ReadCloser{

View File

@@ -234,6 +234,88 @@ func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, ma
return min, max, nil return min, max, nil
} }
// FindIndexByTimestamp returns the highest index before a given point-in-time
// within a generation. Returns ErrNoSnapshots if no index exists on the replica
// for the generation.
func FindIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
snapshotIndex, err := FindSnapshotIndexByTimestamp(ctx, client, generation, timestamp)
if err == ErrNoSnapshots {
return 0, err
} else if err != nil {
return 0, fmt.Errorf("max snapshot index: %w", err)
}
// Determine the highest available WAL index.
walIndex, err := FindWALIndexByTimestamp(ctx, client, generation, timestamp)
if err != nil && err != ErrNoWALSegments {
return 0, fmt.Errorf("max wal index: %w", err)
}
// Use snapshot index if it's after the last WAL index.
if snapshotIndex > walIndex {
return snapshotIndex, nil
}
return walIndex, nil
}
// FindSnapshotIndexByTimestamp returns the highest snapshot index before timestamp.
// Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.
func FindSnapshotIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
itr, err := client.Snapshots(ctx, generation)
if err != nil {
return 0, fmt.Errorf("snapshots: %w", err)
}
defer func() { _ = itr.Close() }()
// Iterate over snapshots to find the highest index.
var n int
for ; itr.Next(); n++ {
if info := itr.Snapshot(); info.CreatedAt.After(timestamp) {
continue
} else if info.Index > index {
index = info.Index
}
}
if err := itr.Close(); err != nil {
return 0, fmt.Errorf("snapshot iteration: %w", err)
}
// Return an error if no snapshots were found.
if n == 0 {
return 0, ErrNoSnapshots
}
return index, nil
}
// FindWALIndexByTimestamp returns the highest WAL index before timestamp.
// Returns ErrNoWALSegments if no segments exist for the generation on the replica.
func FindWALIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
itr, err := client.WALSegments(ctx, generation)
if err != nil {
return 0, fmt.Errorf("wal segments: %w", err)
}
defer func() { _ = itr.Close() }()
// Iterate over WAL segments to find the highest index.
var n int
for ; itr.Next(); n++ {
if info := itr.WALSegment(); info.CreatedAt.After(timestamp) {
continue
} else if info.Index > index {
index = info.Index
}
}
if err := itr.Close(); err != nil {
return 0, fmt.Errorf("wal segment iteration: %w", err)
}
// Return an error if no WAL segments were found.
if n == 0 {
return 0, ErrNoWALSegments
}
return index, nil
}
// FindMaxIndexByGeneration returns the last index within a generation. // FindMaxIndexByGeneration returns the last index within a generation.
// Returns ErrNoSnapshots if no index exists on the replica for the generation. // Returns ErrNoSnapshots if no index exists on the replica for the generation.
func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) { func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {

View File

@@ -489,6 +489,167 @@ func TestFindMaxIndexByGeneration(t *testing.T) {
}) })
} }
func TestFindSnapshotIndexByTimestamp(t *testing.T) {
t.Run("OK", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "ok"))
if index, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if got, want := index, 0x000007d0; got != want {
t.Fatalf("index=%d, want %d", got, want)
}
})
t.Run("ErrNoSnapshots", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "no-snapshots"))
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err != litestream.ErrNoSnapshots {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrSnapshots", func(t *testing.T) {
var client mock.ReplicaClient
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
return nil, fmt.Errorf("marker")
}
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err == nil || err.Error() != `snapshots: marker` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrSnapshotIteration", func(t *testing.T) {
var itr mock.SnapshotIterator
itr.NextFunc = func() bool { return false }
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
var client mock.ReplicaClient
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
return &itr, nil
}
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err == nil || err.Error() != `snapshot iteration: marker` {
t.Fatalf("unexpected error: %s", err)
}
})
}
func TestFindWALIndexByTimestamp(t *testing.T) {
t.Run("OK", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "ok"))
if index, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if got, want := index, 1; got != want {
t.Fatalf("index=%d, want %d", got, want)
}
})
t.Run("ErrNoWALSegments", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "no-wal"))
_, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err != litestream.ErrNoWALSegments {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrWALSegments", func(t *testing.T) {
var client mock.ReplicaClient
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
return nil, fmt.Errorf("marker")
}
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err == nil || err.Error() != `wal segments: marker` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrWALSegmentIteration", func(t *testing.T) {
var itr mock.WALSegmentIterator
itr.NextFunc = func() bool { return false }
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
var client mock.ReplicaClient
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
return &itr, nil
}
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err == nil || err.Error() != `wal segment iteration: marker` {
t.Fatalf("unexpected error: %s", err)
}
})
}
func TestFindIndexByTimestamp(t *testing.T) {
t.Run("OK", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "ok"))
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 4, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if got, want := index, 0x00000002; got != want {
t.Fatalf("index=%d, want %d", got, want)
}
})
t.Run("NoWAL", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-wal"))
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if got, want := index, 0x00000001; got != want {
t.Fatalf("index=%d, want %d", got, want)
}
})
t.Run("SnapshotLaterThanWAL", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "snapshot-later-than-wal"))
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if got, want := index, 0x00000001; got != want {
t.Fatalf("index=%d, want %d", got, want)
}
})
t.Run("ErrNoSnapshots", func(t *testing.T) {
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-snapshots"))
_, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err != litestream.ErrNoSnapshots {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrSnapshots", func(t *testing.T) {
var client mock.ReplicaClient
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
return nil, fmt.Errorf("marker")
}
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err == nil || err.Error() != `max snapshot index: snapshots: marker` {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("ErrWALSegments", func(t *testing.T) {
var client mock.ReplicaClient
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
return litestream.NewSnapshotInfoSliceIterator([]litestream.SnapshotInfo{{Index: 0x00000001}}), nil
}
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
return nil, fmt.Errorf("marker")
}
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
if err == nil || err.Error() != `max wal index: wal segments: marker` {
t.Fatalf("unexpected error: %s", err)
}
})
}
func TestRestore(t *testing.T) { func TestRestore(t *testing.T) {
t.Run("OK", func(t *testing.T) { t.Run("OK", func(t *testing.T) {
testDir := filepath.Join("testdata", "restore", "ok") testDir := filepath.Join("testdata", "restore", "ok")

View File

@@ -703,6 +703,13 @@ func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool)
endpoint = net.JoinHostPort(endpoint, port) endpoint = net.JoinHostPort(endpoint, port)
} }
if e := os.Getenv("LITESTREAM_ENDPOINT"); e != "" {
endpoint = e
}
if r := os.Getenv("LITESTREAM_REGION"); r != "" {
region = r
}
// Prepend scheme to endpoint. // Prepend scheme to endpoint.
if endpoint != "" { if endpoint != "" {
endpoint = scheme + "://" + endpoint endpoint = scheme + "://" + endpoint

5
testdata/Makefile vendored
View File

@@ -1,8 +1,13 @@
.PHONY: default .PHONY: default
default: default:
make -C find-latest-generation/ok make -C find-latest-generation/ok
make -C index-by-timestamp/no-wal
make -C index-by-timestamp/ok
make -C index-by-timestamp/snapshot-later-than-wal
make -C generation-time-bounds/ok make -C generation-time-bounds/ok
make -C generation-time-bounds/snapshots-only make -C generation-time-bounds/snapshots-only
make -C replica-client-time-bounds/ok make -C replica-client-time-bounds/ok
make -C snapshot-time-bounds/ok make -C snapshot-time-bounds/ok
make -C snapshot-index-by-timestamp/ok
make -C wal-time-bounds/ok make -C wal-time-bounds/ok
make -C wal-index-by-timestamp/ok

View File

@@ -0,0 +1,6 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000002.snapshot.lz4

11
testdata/index-by-timestamp/ok/Makefile vendored Normal file
View File

@@ -0,0 +1,11 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001040000 generations/0000000000000000/wal/0000000000000002/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001050000 generations/0000000000000000/wal/0000000000000003/0000000000000000.wal.lz4

View File

@@ -0,0 +1,7 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4

View File

@@ -0,0 +1,5 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/00000000000003e8.snapshot.lz4
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/00000000000007d0.snapshot.lz4

View File

@@ -0,0 +1,6 @@
.PHONY: default
default:
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4