Compare commits
9 Commits
v0.4.0-bet
...
fix-stream
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c53a09c124 | ||
|
|
46597ab22f | ||
|
|
e6f7c6052d | ||
|
|
7d8b8c6ec0 | ||
|
|
88737d7164 | ||
|
|
6763e9218c | ||
|
|
301e1172fd | ||
|
|
ca07137d32 | ||
|
|
80f8de4d9e |
7
.github/workflows/build_and_test.yml
vendored
7
.github/workflows/build_and_test.yml
vendored
@@ -22,4 +22,9 @@ jobs:
|
||||
run: go install ./cmd/litestream
|
||||
|
||||
- name: Run unit tests
|
||||
run: make testdata && go test -v ./...
|
||||
run: make testdata && go test -v --coverprofile=.coverage.out ./... && go tool cover -html .coverage.out -o .coverage.html
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: code-coverage
|
||||
path: .coverage.html
|
||||
|
||||
8
.github/workflows/release.linux.yml
vendored
8
.github/workflows/release.linux.yml
vendored
@@ -23,7 +23,6 @@ jobs:
|
||||
- arch: amd64
|
||||
cc: gcc
|
||||
static: true
|
||||
deploy_test_runner: true
|
||||
|
||||
- arch: arm64
|
||||
cc: aarch64-linux-gnu-gcc
|
||||
@@ -105,6 +104,13 @@ jobs:
|
||||
path: dist/litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.deb
|
||||
if-no-files-found: error
|
||||
|
||||
- name: Get release
|
||||
id: release
|
||||
uses: bruceadams/get-release@v1.2.3
|
||||
if: github.event_name == 'release'
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
|
||||
- name: Upload release tarball
|
||||
uses: actions/upload-release-asset@v1.0.2
|
||||
if: github.event_name == 'release'
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
.coverage.*
|
||||
.DS_Store
|
||||
/dist
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/litestream"
|
||||
)
|
||||
@@ -22,14 +23,15 @@ type RestoreCommand struct {
|
||||
snapshotIndex int // index of snapshot to start from
|
||||
|
||||
// CLI options
|
||||
configPath string // path to config file
|
||||
noExpandEnv bool // if true, do not expand env variables in config
|
||||
outputPath string // path to restore database to
|
||||
replicaName string // optional, name of replica to restore from
|
||||
generation string // optional, generation to restore
|
||||
targetIndex int // optional, last WAL index to replay
|
||||
ifDBNotExists bool // if true, skips restore if output path already exists
|
||||
ifReplicaExists bool // if true, skips if no backups exist
|
||||
configPath string // path to config file
|
||||
noExpandEnv bool // if true, do not expand env variables in config
|
||||
outputPath string // path to restore database to
|
||||
replicaName string // optional, name of replica to restore from
|
||||
generation string // optional, generation to restore
|
||||
targetIndex int // optional, last WAL index to replay
|
||||
timestamp time.Time // optional, restore to point-in-time (ISO 8601)
|
||||
ifDBNotExists bool // if true, skips restore if output path already exists
|
||||
ifReplicaExists bool // if true, skips if no backups exist
|
||||
opt litestream.RestoreOptions
|
||||
}
|
||||
|
||||
@@ -53,6 +55,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
fs.StringVar(&c.replicaName, "replica", "", "replica name")
|
||||
fs.StringVar(&c.generation, "generation", "", "generation name")
|
||||
fs.Var((*indexVar)(&c.targetIndex), "index", "wal index")
|
||||
timestampStr := fs.String("timestamp", "", "point-in-time restore (ISO 8601)")
|
||||
fs.IntVar(&c.opt.Parallelism, "parallelism", c.opt.Parallelism, "parallelism")
|
||||
fs.BoolVar(&c.ifDBNotExists, "if-db-not-exists", false, "")
|
||||
fs.BoolVar(&c.ifReplicaExists, "if-replica-exists", false, "")
|
||||
@@ -66,9 +69,20 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
pathOrURL := fs.Arg(0)
|
||||
|
||||
// Parse timestamp.
|
||||
if *timestampStr != "" {
|
||||
if c.timestamp, err = time.Parse(time.RFC3339Nano, *timestampStr); err != nil {
|
||||
return fmt.Errorf("invalid -timestamp, expected ISO 8601: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a generation is specified if target index is specified.
|
||||
if c.targetIndex != -1 && c.generation == "" {
|
||||
if c.targetIndex != -1 && !c.timestamp.IsZero() {
|
||||
return fmt.Errorf("cannot specify both -index flag and -timestamp flag")
|
||||
} else if c.targetIndex != -1 && c.generation == "" {
|
||||
return fmt.Errorf("must specify -generation flag when using -index flag")
|
||||
} else if !c.timestamp.IsZero() && c.generation == "" {
|
||||
return fmt.Errorf("must specify -generation flag when using -timestamp flag")
|
||||
}
|
||||
|
||||
// Default to original database path if output path not specified.
|
||||
@@ -117,7 +131,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
|
||||
// Determine the maximum available index for the generation if one is not specified.
|
||||
if c.targetIndex == -1 {
|
||||
if !c.timestamp.IsZero() {
|
||||
if c.targetIndex, err = litestream.FindIndexByTimestamp(ctx, r.Client(), c.generation, c.timestamp); err != nil {
|
||||
return fmt.Errorf("cannot find index for timestamp in generation %q: %w", c.generation, err)
|
||||
}
|
||||
} else if c.targetIndex == -1 {
|
||||
if c.targetIndex, err = litestream.FindMaxIndexByGeneration(ctx, r.Client(), c.generation); err != nil {
|
||||
return fmt.Errorf("cannot determine latest index in generation %q: %w", c.generation, err)
|
||||
}
|
||||
@@ -239,6 +257,10 @@ Arguments:
|
||||
Restore up to a specific hex-encoded WAL index (inclusive).
|
||||
Defaults to use the highest available index.
|
||||
|
||||
-timestamp DATETIME
|
||||
Restore up to a specific point-in-time. Must be ISO 8601.
|
||||
Cannot be specified with -index flag.
|
||||
|
||||
-o PATH
|
||||
Output path of the restored database.
|
||||
Defaults to original DB path.
|
||||
@@ -253,9 +275,6 @@ Arguments:
|
||||
Determines the number of WAL files downloaded in parallel.
|
||||
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
|
||||
|
||||
-v
|
||||
Verbose output.
|
||||
|
||||
|
||||
Examples:
|
||||
|
||||
@@ -271,6 +290,9 @@ Examples:
|
||||
# Restore database from specific generation on S3.
|
||||
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
|
||||
|
||||
# Restore database to a specific point in time.
|
||||
$ litestream restore -generation xxxxxxxx -timestamp 2000-01-01T00:00:00Z /path/to/db
|
||||
|
||||
`[1:],
|
||||
DefaultConfigPath(),
|
||||
)
|
||||
|
||||
179
db.go
179
db.go
@@ -17,6 +17,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/litestream/internal"
|
||||
@@ -1747,23 +1748,34 @@ func (db *DB) stream(ctx context.Context) error {
|
||||
|
||||
// streamSnapshot reads the snapshot into the WAL and applies it to the main database.
|
||||
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
|
||||
// Truncate WAL file.
|
||||
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
||||
return fmt.Errorf("truncate: %w", err)
|
||||
}
|
||||
|
||||
// Determine total page count.
|
||||
pageN := int(hdr.Size / int64(db.pageSize))
|
||||
|
||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
||||
if err := ww.Open(); err != nil {
|
||||
return fmt.Errorf("open wal writer: %w", err)
|
||||
// Open database file.
|
||||
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open db file: %w", err)
|
||||
}
|
||||
defer func() { _ = ww.Close() }()
|
||||
defer f.Close()
|
||||
|
||||
if err := ww.WriteHeader(); err != nil {
|
||||
return fmt.Errorf("write wal header: %w", err)
|
||||
// Open shm file for locking.
|
||||
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open shm file: %w", err)
|
||||
}
|
||||
defer shmFile.Close()
|
||||
|
||||
// Obtain WAL checkpoint lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Obtain WAL write lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Iterate over pages
|
||||
buf := make([]byte, db.pageSize)
|
||||
@@ -1775,26 +1787,18 @@ func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.
|
||||
return fmt.Errorf("read snapshot page %d: %w", pgno, err)
|
||||
}
|
||||
|
||||
// Issue a commit flag when the last page is reached.
|
||||
var commit uint32
|
||||
// Copy page to database file.
|
||||
offset := int64(pgno-1) * int64(db.pageSize)
|
||||
if _, err := f.WriteAt(buf, offset); err != nil {
|
||||
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
||||
}
|
||||
|
||||
// Truncate database to final size.
|
||||
if pgno == uint32(pageN) {
|
||||
commit = uint32(pageN)
|
||||
if err := f.Truncate(int64(pageN) * int64(db.pageSize)); err != nil {
|
||||
return fmt.Errorf("truncate db: commit=%d err=%w", pageN, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Write page into WAL frame.
|
||||
if err := ww.WriteFrame(pgno, commit, buf); err != nil {
|
||||
return fmt.Errorf("write wal frame: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Close WAL file writer.
|
||||
if err := ww.Close(); err != nil {
|
||||
return fmt.Errorf("close wal writer: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate WAL index.
|
||||
if err := invalidateSHMFile(db.path); err != nil {
|
||||
return fmt.Errorf("invalidate shm file: %w", err)
|
||||
}
|
||||
|
||||
// Write position to file so other processes can read it.
|
||||
@@ -1819,44 +1823,63 @@ func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r i
|
||||
}
|
||||
}
|
||||
|
||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
||||
if err := ww.Open(); err != nil {
|
||||
return fmt.Errorf("open wal writer: %w", err)
|
||||
// Open database file.
|
||||
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open db file: %w", err)
|
||||
}
|
||||
defer func() { _ = ww.Close() }()
|
||||
defer f.Close()
|
||||
|
||||
if err := ww.WriteHeader(); err != nil {
|
||||
return fmt.Errorf("write wal header: %w", err)
|
||||
// Open shm file for locking.
|
||||
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open shm file: %w", err)
|
||||
}
|
||||
defer shmFile.Close()
|
||||
|
||||
// Obtain WAL checkpoint lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Obtain WAL write lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Iterate over incoming WAL pages.
|
||||
buf := make([]byte, WALFrameHeaderSize+db.pageSize)
|
||||
for i := 0; ; i++ {
|
||||
// Read snapshot page into a buffer.
|
||||
if _, err := io.ReadFull(zr, buf); err == io.EOF {
|
||||
if n, err := io.ReadFull(zr, buf); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("read wal frame %d: %w", i, err)
|
||||
return fmt.Errorf("read wal frame: i=%d n=%d err=%w", i, n, err)
|
||||
}
|
||||
|
||||
// Read page number & commit field.
|
||||
pgno := binary.BigEndian.Uint32(buf[0:])
|
||||
commit := binary.BigEndian.Uint32(buf[4:])
|
||||
|
||||
// Write page into WAL frame.
|
||||
if err := ww.WriteFrame(pgno, commit, buf[WALFrameHeaderSize:]); err != nil {
|
||||
return fmt.Errorf("write wal frame: %w", err)
|
||||
// Copy page to database file.
|
||||
offset := int64(pgno-1) * int64(db.pageSize)
|
||||
if _, err := f.WriteAt(buf[WALFrameHeaderSize:], offset); err != nil {
|
||||
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
||||
}
|
||||
|
||||
// Truncate database, if commit specified.
|
||||
if commit != 0 {
|
||||
if err := f.Truncate(int64(commit) * int64(db.pageSize)); err != nil {
|
||||
return fmt.Errorf("truncate db: commit=%d err=%w", commit, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close WAL file writer.
|
||||
if err := ww.Close(); err != nil {
|
||||
return fmt.Errorf("close wal writer: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate WAL index.
|
||||
if err := invalidateSHMFile(db.path); err != nil {
|
||||
return fmt.Errorf("invalidate shm file: %w", err)
|
||||
// Close database file writer.
|
||||
if err := f.Close(); err != nil {
|
||||
return fmt.Errorf("close db writer: %w", err)
|
||||
}
|
||||
|
||||
// Write position to file so other processes can read it.
|
||||
@@ -2016,51 +2039,21 @@ func logPrefixPath(path string) string {
|
||||
return path
|
||||
}
|
||||
|
||||
// invalidateSHMFile clears the iVersion field of the -shm file in order that
|
||||
// the next transaction will rebuild it.
|
||||
func invalidateSHMFile(dbPath string) error {
|
||||
db, err := sql.Open("sqlite3", dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reopen db: %w", err)
|
||||
}
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
if _, err := db.Exec(`PRAGMA wal_checkpoint(PASSIVE)`); err != nil {
|
||||
return fmt.Errorf("passive checkpoint: %w", err)
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(dbPath+"-shm", os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open shm index: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
buf := make([]byte, WALIndexHeaderSize)
|
||||
if _, err := io.ReadFull(f, buf); err != nil {
|
||||
return fmt.Errorf("read shm index: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate "isInit" fields.
|
||||
buf[12], buf[60] = 0, 0
|
||||
|
||||
// Rewrite header.
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("seek shm index: %w", err)
|
||||
} else if _, err := f.Write(buf); err != nil {
|
||||
return fmt.Errorf("overwrite shm index: %w", err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
return fmt.Errorf("close shm index: %w", err)
|
||||
}
|
||||
|
||||
// Truncate WAL file again.
|
||||
var row [3]int
|
||||
if err := db.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE)`).Scan(&row[0], &row[1], &row[2]); err != nil {
|
||||
return fmt.Errorf("truncate: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// A marker error to indicate that a restart checkpoint could not verify
|
||||
// continuity between WAL indices and a new generation should be started.
|
||||
var errRestartGeneration = errors.New("restart generation")
|
||||
|
||||
const (
|
||||
WAL_WRITE_LOCK_OFFSET = 120
|
||||
WAL_CKPT_LOCK_OFFSET = 121
|
||||
)
|
||||
|
||||
// setLkw is a helper function for calling fcntl for file locking.
|
||||
func setLkw(f *os.File, typ int16, start, len int64) error {
|
||||
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &syscall.Flock_t{
|
||||
Start: start,
|
||||
Len: len,
|
||||
Type: typ,
|
||||
Whence: io.SeekStart,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ func (c *Client) Stream(ctx context.Context, pos litestream.Pos) (litestream.Str
|
||||
if !pos.IsZero() {
|
||||
q.Set("generation", pos.Generation)
|
||||
q.Set("index", litestream.FormatIndex(pos.Index))
|
||||
q.Set("offset", litestream.FormatOffset(pos.Offset))
|
||||
}
|
||||
|
||||
// Strip off everything but the scheme & host.
|
||||
|
||||
@@ -134,16 +134,31 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
generationStr := q.Get("generation")
|
||||
indexStr := q.Get("index")
|
||||
offsetStr := q.Get("offset")
|
||||
|
||||
// Parse current client position, if available.
|
||||
var pos litestream.Pos
|
||||
if generation, index := q.Get("generation"), q.Get("index"); generation != "" && index != "" {
|
||||
pos.Generation = generation
|
||||
if generationStr != "" && indexStr != "" && offsetStr != "" {
|
||||
|
||||
var err error
|
||||
if pos.Index, err = litestream.ParseIndex(index); err != nil {
|
||||
index, err := litestream.ParseIndex(indexStr)
|
||||
if err != nil {
|
||||
s.writeError(w, r, "Invalid index query parameter", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
offset, err := litestream.ParseOffset(offsetStr)
|
||||
if err != nil {
|
||||
s.writeError(w, r, "Invalid offset query parameter", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
pos = litestream.Pos{
|
||||
Generation: generationStr,
|
||||
Index: index,
|
||||
Offset: offset,
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch database instance from the primary server.
|
||||
@@ -162,7 +177,6 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
s.writeError(w, r, "No generation available", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
dbPos.Offset = 0
|
||||
|
||||
// Use database position if generation has changed.
|
||||
var snapshotRequired bool
|
||||
@@ -170,6 +184,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
s.Logger.Printf("stream generation mismatch, using primary position: client.pos=%s", pos)
|
||||
pos, snapshotRequired = dbPos, true
|
||||
}
|
||||
pos.Offset = 0
|
||||
|
||||
// Obtain iterator before snapshot so we don't miss any WAL segments.
|
||||
fitr, err := db.WALSegments(r.Context(), pos.Generation)
|
||||
@@ -289,7 +304,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
// Flush after WAL segment has been written.
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
if bitr.Err() != nil {
|
||||
if err := bitr.Err(); err != nil {
|
||||
s.Logger.Printf("wal iterator error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -481,7 +481,7 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
} else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil {
|
||||
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY, DATA TEXT)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db0.Close()
|
||||
@@ -489,8 +489,8 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
|
||||
var index int
|
||||
insertAndWait := func() {
|
||||
index++
|
||||
t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", index)
|
||||
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, index); err != nil {
|
||||
t.Logf("[exec] INSERT INTO t (id, data) VALUES (%d, '...')", index)
|
||||
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id, data) VALUES (?, ?)`, index, strings.Repeat("x", 512)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
@@ -102,6 +102,24 @@ func TestTruncateDuration(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMD5Hash(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
input []byte
|
||||
output string
|
||||
}{
|
||||
{[]byte{}, "d41d8cd98f00b204e9800998ecf8427e"},
|
||||
{[]byte{0x0}, "93b885adfe0da089cdf634904fd59f71"},
|
||||
{[]byte{0x0, 0x1, 0x2, 0x3}, "37b59afd592725f9305e484a5d7f5168"},
|
||||
{[]byte("Hello, world!"), "6cd3556deb0da54bca060b4c39479839"},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("%v", tt.input), func(t *testing.T) {
|
||||
if got, want := internal.MD5Hash(tt.input), tt.output; got != want {
|
||||
t.Fatalf("hash=%s, want %s", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnceCloser(t *testing.T) {
|
||||
var closed bool
|
||||
var rc = &mock.ReadCloser{
|
||||
|
||||
@@ -234,6 +234,88 @@ func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, ma
|
||||
return min, max, nil
|
||||
}
|
||||
|
||||
// FindIndexByTimestamp returns the highest index before a given point-in-time
|
||||
// within a generation. Returns ErrNoSnapshots if no index exists on the replica
|
||||
// for the generation.
|
||||
func FindIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||
snapshotIndex, err := FindSnapshotIndexByTimestamp(ctx, client, generation, timestamp)
|
||||
if err == ErrNoSnapshots {
|
||||
return 0, err
|
||||
} else if err != nil {
|
||||
return 0, fmt.Errorf("max snapshot index: %w", err)
|
||||
}
|
||||
|
||||
// Determine the highest available WAL index.
|
||||
walIndex, err := FindWALIndexByTimestamp(ctx, client, generation, timestamp)
|
||||
if err != nil && err != ErrNoWALSegments {
|
||||
return 0, fmt.Errorf("max wal index: %w", err)
|
||||
}
|
||||
|
||||
// Use snapshot index if it's after the last WAL index.
|
||||
if snapshotIndex > walIndex {
|
||||
return snapshotIndex, nil
|
||||
}
|
||||
return walIndex, nil
|
||||
}
|
||||
|
||||
// FindSnapshotIndexByTimestamp returns the highest snapshot index before timestamp.
|
||||
// Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.
|
||||
func FindSnapshotIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||
itr, err := client.Snapshots(ctx, generation)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("snapshots: %w", err)
|
||||
}
|
||||
defer func() { _ = itr.Close() }()
|
||||
|
||||
// Iterate over snapshots to find the highest index.
|
||||
var n int
|
||||
for ; itr.Next(); n++ {
|
||||
if info := itr.Snapshot(); info.CreatedAt.After(timestamp) {
|
||||
continue
|
||||
} else if info.Index > index {
|
||||
index = info.Index
|
||||
}
|
||||
}
|
||||
if err := itr.Close(); err != nil {
|
||||
return 0, fmt.Errorf("snapshot iteration: %w", err)
|
||||
}
|
||||
|
||||
// Return an error if no snapshots were found.
|
||||
if n == 0 {
|
||||
return 0, ErrNoSnapshots
|
||||
}
|
||||
return index, nil
|
||||
}
|
||||
|
||||
// FindWALIndexByTimestamp returns the highest WAL index before timestamp.
|
||||
// Returns ErrNoWALSegments if no segments exist for the generation on the replica.
|
||||
func FindWALIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||
itr, err := client.WALSegments(ctx, generation)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("wal segments: %w", err)
|
||||
}
|
||||
defer func() { _ = itr.Close() }()
|
||||
|
||||
// Iterate over WAL segments to find the highest index.
|
||||
var n int
|
||||
for ; itr.Next(); n++ {
|
||||
if info := itr.WALSegment(); info.CreatedAt.After(timestamp) {
|
||||
continue
|
||||
} else if info.Index > index {
|
||||
index = info.Index
|
||||
}
|
||||
}
|
||||
if err := itr.Close(); err != nil {
|
||||
return 0, fmt.Errorf("wal segment iteration: %w", err)
|
||||
}
|
||||
|
||||
// Return an error if no WAL segments were found.
|
||||
if n == 0 {
|
||||
return 0, ErrNoWALSegments
|
||||
}
|
||||
return index, nil
|
||||
}
|
||||
|
||||
// FindMaxIndexByGeneration returns the last index within a generation.
|
||||
// Returns ErrNoSnapshots if no index exists on the replica for the generation.
|
||||
func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {
|
||||
|
||||
@@ -489,6 +489,167 @@ func TestFindMaxIndexByGeneration(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestFindSnapshotIndexByTimestamp(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "ok"))
|
||||
if index, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x000007d0; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrNoSnapshots", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "no-snapshots"))
|
||||
|
||||
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err != litestream.ErrNoSnapshots {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrSnapshots", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `snapshots: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrSnapshotIteration", func(t *testing.T) {
|
||||
var itr mock.SnapshotIterator
|
||||
itr.NextFunc = func() bool { return false }
|
||||
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
|
||||
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return &itr, nil
|
||||
}
|
||||
|
||||
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `snapshot iteration: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFindWALIndexByTimestamp(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "ok"))
|
||||
if index, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 1; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrNoWALSegments", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "no-wal"))
|
||||
|
||||
_, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err != litestream.ErrNoWALSegments {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrWALSegments", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `wal segments: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrWALSegmentIteration", func(t *testing.T) {
|
||||
var itr mock.WALSegmentIterator
|
||||
itr.NextFunc = func() bool { return false }
|
||||
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
|
||||
|
||||
var client mock.ReplicaClient
|
||||
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||
return &itr, nil
|
||||
}
|
||||
|
||||
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `wal segment iteration: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFindIndexByTimestamp(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "ok"))
|
||||
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 4, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x00000002; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("NoWAL", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-wal"))
|
||||
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x00000001; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("SnapshotLaterThanWAL", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "snapshot-later-than-wal"))
|
||||
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x00000001; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrNoSnapshots", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-snapshots"))
|
||||
|
||||
_, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err != litestream.ErrNoSnapshots {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrSnapshots", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `max snapshot index: snapshots: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrWALSegments", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return litestream.NewSnapshotInfoSliceIterator([]litestream.SnapshotInfo{{Index: 0x00000001}}), nil
|
||||
}
|
||||
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `max wal index: wal segments: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRestore(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
testDir := filepath.Join("testdata", "restore", "ok")
|
||||
|
||||
@@ -703,6 +703,13 @@ func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool)
|
||||
endpoint = net.JoinHostPort(endpoint, port)
|
||||
}
|
||||
|
||||
if e := os.Getenv("LITESTREAM_ENDPOINT"); e != "" {
|
||||
endpoint = e
|
||||
}
|
||||
if r := os.Getenv("LITESTREAM_REGION"); r != "" {
|
||||
region = r
|
||||
}
|
||||
|
||||
// Prepend scheme to endpoint.
|
||||
if endpoint != "" {
|
||||
endpoint = scheme + "://" + endpoint
|
||||
|
||||
5
testdata/Makefile
vendored
5
testdata/Makefile
vendored
@@ -1,8 +1,13 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
make -C find-latest-generation/ok
|
||||
make -C index-by-timestamp/no-wal
|
||||
make -C index-by-timestamp/ok
|
||||
make -C index-by-timestamp/snapshot-later-than-wal
|
||||
make -C generation-time-bounds/ok
|
||||
make -C generation-time-bounds/snapshots-only
|
||||
make -C replica-client-time-bounds/ok
|
||||
make -C snapshot-time-bounds/ok
|
||||
make -C snapshot-index-by-timestamp/ok
|
||||
make -C wal-time-bounds/ok
|
||||
make -C wal-index-by-timestamp/ok
|
||||
|
||||
0
testdata/index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
0
testdata/index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
6
testdata/index-by-timestamp/no-wal/Makefile
vendored
Normal file
6
testdata/index-by-timestamp/no-wal/Makefile
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000002.snapshot.lz4
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
11
testdata/index-by-timestamp/ok/Makefile
vendored
Normal file
11
testdata/index-by-timestamp/ok/Makefile
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001040000 generations/0000000000000000/wal/0000000000000002/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001050000 generations/0000000000000000/wal/0000000000000003/0000000000000000.wal.lz4
|
||||
|
||||
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
vendored
Normal file
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
vendored
Normal file
Binary file not shown.
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
vendored
Normal file
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
vendored
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
7
testdata/index-by-timestamp/snapshot-later-than-wal/Makefile
vendored
Normal file
7
testdata/index-by-timestamp/snapshot-later-than-wal/Makefile
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
0
testdata/snapshot-index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
0
testdata/snapshot-index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
5
testdata/snapshot-index-by-timestamp/ok/Makefile
vendored
Normal file
5
testdata/snapshot-index-by-timestamp/ok/Makefile
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/00000000000003e8.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/00000000000007d0.snapshot.lz4
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
6
testdata/wal-index-by-timestamp/ok/Makefile
vendored
Normal file
6
testdata/wal-index-by-timestamp/ok/Makefile
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user