Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c53a09c124 | |||
| 46597ab22f | |||
| e6f7c6052d | |||
| 7d8b8c6ec0 | |||
| 88737d7164 | |||
| 6763e9218c | |||
| 301e1172fd | |||
| ca07137d32 | |||
| 80f8de4d9e |
@@ -22,4 +22,9 @@ jobs:
|
||||
run: go install ./cmd/litestream
|
||||
|
||||
- name: Run unit tests
|
||||
run: make testdata && go test -v ./...
|
||||
run: make testdata && go test -v --coverprofile=.coverage.out ./... && go tool cover -html .coverage.out -o .coverage.html
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: code-coverage
|
||||
path: .coverage.html
|
||||
|
||||
@@ -23,7 +23,6 @@ jobs:
|
||||
- arch: amd64
|
||||
cc: gcc
|
||||
static: true
|
||||
deploy_test_runner: true
|
||||
|
||||
- arch: arm64
|
||||
cc: aarch64-linux-gnu-gcc
|
||||
@@ -105,6 +104,13 @@ jobs:
|
||||
path: dist/litestream-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}${{ env.SUFFIX }}.deb
|
||||
if-no-files-found: error
|
||||
|
||||
- name: Get release
|
||||
id: release
|
||||
uses: bruceadams/get-release@v1.2.3
|
||||
if: github.event_name == 'release'
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
|
||||
- name: Upload release tarball
|
||||
uses: actions/upload-release-asset@v1.0.2
|
||||
if: github.event_name == 'release'
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
.coverage.*
|
||||
.DS_Store
|
||||
/dist
|
||||
|
||||
+35
-13
@@ -9,6 +9,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/litestream"
|
||||
)
|
||||
@@ -22,14 +23,15 @@ type RestoreCommand struct {
|
||||
snapshotIndex int // index of snapshot to start from
|
||||
|
||||
// CLI options
|
||||
configPath string // path to config file
|
||||
noExpandEnv bool // if true, do not expand env variables in config
|
||||
outputPath string // path to restore database to
|
||||
replicaName string // optional, name of replica to restore from
|
||||
generation string // optional, generation to restore
|
||||
targetIndex int // optional, last WAL index to replay
|
||||
ifDBNotExists bool // if true, skips restore if output path already exists
|
||||
ifReplicaExists bool // if true, skips if no backups exist
|
||||
configPath string // path to config file
|
||||
noExpandEnv bool // if true, do not expand env variables in config
|
||||
outputPath string // path to restore database to
|
||||
replicaName string // optional, name of replica to restore from
|
||||
generation string // optional, generation to restore
|
||||
targetIndex int // optional, last WAL index to replay
|
||||
timestamp time.Time // optional, restore to point-in-time (ISO 8601)
|
||||
ifDBNotExists bool // if true, skips restore if output path already exists
|
||||
ifReplicaExists bool // if true, skips if no backups exist
|
||||
opt litestream.RestoreOptions
|
||||
}
|
||||
|
||||
@@ -53,6 +55,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
fs.StringVar(&c.replicaName, "replica", "", "replica name")
|
||||
fs.StringVar(&c.generation, "generation", "", "generation name")
|
||||
fs.Var((*indexVar)(&c.targetIndex), "index", "wal index")
|
||||
timestampStr := fs.String("timestamp", "", "point-in-time restore (ISO 8601)")
|
||||
fs.IntVar(&c.opt.Parallelism, "parallelism", c.opt.Parallelism, "parallelism")
|
||||
fs.BoolVar(&c.ifDBNotExists, "if-db-not-exists", false, "")
|
||||
fs.BoolVar(&c.ifReplicaExists, "if-replica-exists", false, "")
|
||||
@@ -66,9 +69,20 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
pathOrURL := fs.Arg(0)
|
||||
|
||||
// Parse timestamp.
|
||||
if *timestampStr != "" {
|
||||
if c.timestamp, err = time.Parse(time.RFC3339Nano, *timestampStr); err != nil {
|
||||
return fmt.Errorf("invalid -timestamp, expected ISO 8601: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a generation is specified if target index is specified.
|
||||
if c.targetIndex != -1 && c.generation == "" {
|
||||
if c.targetIndex != -1 && !c.timestamp.IsZero() {
|
||||
return fmt.Errorf("cannot specify both -index flag and -timestamp flag")
|
||||
} else if c.targetIndex != -1 && c.generation == "" {
|
||||
return fmt.Errorf("must specify -generation flag when using -index flag")
|
||||
} else if !c.timestamp.IsZero() && c.generation == "" {
|
||||
return fmt.Errorf("must specify -generation flag when using -timestamp flag")
|
||||
}
|
||||
|
||||
// Default to original database path if output path not specified.
|
||||
@@ -117,7 +131,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
|
||||
// Determine the maximum available index for the generation if one is not specified.
|
||||
if c.targetIndex == -1 {
|
||||
if !c.timestamp.IsZero() {
|
||||
if c.targetIndex, err = litestream.FindIndexByTimestamp(ctx, r.Client(), c.generation, c.timestamp); err != nil {
|
||||
return fmt.Errorf("cannot find index for timestamp in generation %q: %w", c.generation, err)
|
||||
}
|
||||
} else if c.targetIndex == -1 {
|
||||
if c.targetIndex, err = litestream.FindMaxIndexByGeneration(ctx, r.Client(), c.generation); err != nil {
|
||||
return fmt.Errorf("cannot determine latest index in generation %q: %w", c.generation, err)
|
||||
}
|
||||
@@ -239,6 +257,10 @@ Arguments:
|
||||
Restore up to a specific hex-encoded WAL index (inclusive).
|
||||
Defaults to use the highest available index.
|
||||
|
||||
-timestamp DATETIME
|
||||
Restore up to a specific point-in-time. Must be ISO 8601.
|
||||
Cannot be specified with -index flag.
|
||||
|
||||
-o PATH
|
||||
Output path of the restored database.
|
||||
Defaults to original DB path.
|
||||
@@ -253,9 +275,6 @@ Arguments:
|
||||
Determines the number of WAL files downloaded in parallel.
|
||||
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
|
||||
|
||||
-v
|
||||
Verbose output.
|
||||
|
||||
|
||||
Examples:
|
||||
|
||||
@@ -271,6 +290,9 @@ Examples:
|
||||
# Restore database from specific generation on S3.
|
||||
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
|
||||
|
||||
# Restore database to a specific point in time.
|
||||
$ litestream restore -generation xxxxxxxx -timestamp 2000-01-01T00:00:00Z /path/to/db
|
||||
|
||||
`[1:],
|
||||
DefaultConfigPath(),
|
||||
)
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/litestream/internal"
|
||||
@@ -1747,23 +1748,34 @@ func (db *DB) stream(ctx context.Context) error {
|
||||
|
||||
// streamSnapshot reads the snapshot into the WAL and applies it to the main database.
|
||||
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
|
||||
// Truncate WAL file.
|
||||
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
||||
return fmt.Errorf("truncate: %w", err)
|
||||
}
|
||||
|
||||
// Determine total page count.
|
||||
pageN := int(hdr.Size / int64(db.pageSize))
|
||||
|
||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
||||
if err := ww.Open(); err != nil {
|
||||
return fmt.Errorf("open wal writer: %w", err)
|
||||
// Open database file.
|
||||
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open db file: %w", err)
|
||||
}
|
||||
defer func() { _ = ww.Close() }()
|
||||
defer f.Close()
|
||||
|
||||
if err := ww.WriteHeader(); err != nil {
|
||||
return fmt.Errorf("write wal header: %w", err)
|
||||
// Open shm file for locking.
|
||||
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open shm file: %w", err)
|
||||
}
|
||||
defer shmFile.Close()
|
||||
|
||||
// Obtain WAL checkpoint lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Obtain WAL write lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Iterate over pages
|
||||
buf := make([]byte, db.pageSize)
|
||||
@@ -1775,26 +1787,18 @@ func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.
|
||||
return fmt.Errorf("read snapshot page %d: %w", pgno, err)
|
||||
}
|
||||
|
||||
// Issue a commit flag when the last page is reached.
|
||||
var commit uint32
|
||||
// Copy page to database file.
|
||||
offset := int64(pgno-1) * int64(db.pageSize)
|
||||
if _, err := f.WriteAt(buf, offset); err != nil {
|
||||
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
||||
}
|
||||
|
||||
// Truncate database to final size.
|
||||
if pgno == uint32(pageN) {
|
||||
commit = uint32(pageN)
|
||||
if err := f.Truncate(int64(pageN) * int64(db.pageSize)); err != nil {
|
||||
return fmt.Errorf("truncate db: commit=%d err=%w", pageN, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Write page into WAL frame.
|
||||
if err := ww.WriteFrame(pgno, commit, buf); err != nil {
|
||||
return fmt.Errorf("write wal frame: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Close WAL file writer.
|
||||
if err := ww.Close(); err != nil {
|
||||
return fmt.Errorf("close wal writer: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate WAL index.
|
||||
if err := invalidateSHMFile(db.path); err != nil {
|
||||
return fmt.Errorf("invalidate shm file: %w", err)
|
||||
}
|
||||
|
||||
// Write position to file so other processes can read it.
|
||||
@@ -1819,44 +1823,63 @@ func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r i
|
||||
}
|
||||
}
|
||||
|
||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
||||
if err := ww.Open(); err != nil {
|
||||
return fmt.Errorf("open wal writer: %w", err)
|
||||
// Open database file.
|
||||
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open db file: %w", err)
|
||||
}
|
||||
defer func() { _ = ww.Close() }()
|
||||
defer f.Close()
|
||||
|
||||
if err := ww.WriteHeader(); err != nil {
|
||||
return fmt.Errorf("write wal header: %w", err)
|
||||
// Open shm file for locking.
|
||||
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open shm file: %w", err)
|
||||
}
|
||||
defer shmFile.Close()
|
||||
|
||||
// Obtain WAL checkpoint lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Obtain WAL write lock.
|
||||
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
||||
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
||||
}
|
||||
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
||||
|
||||
// Iterate over incoming WAL pages.
|
||||
buf := make([]byte, WALFrameHeaderSize+db.pageSize)
|
||||
for i := 0; ; i++ {
|
||||
// Read snapshot page into a buffer.
|
||||
if _, err := io.ReadFull(zr, buf); err == io.EOF {
|
||||
if n, err := io.ReadFull(zr, buf); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("read wal frame %d: %w", i, err)
|
||||
return fmt.Errorf("read wal frame: i=%d n=%d err=%w", i, n, err)
|
||||
}
|
||||
|
||||
// Read page number & commit field.
|
||||
pgno := binary.BigEndian.Uint32(buf[0:])
|
||||
commit := binary.BigEndian.Uint32(buf[4:])
|
||||
|
||||
// Write page into WAL frame.
|
||||
if err := ww.WriteFrame(pgno, commit, buf[WALFrameHeaderSize:]); err != nil {
|
||||
return fmt.Errorf("write wal frame: %w", err)
|
||||
// Copy page to database file.
|
||||
offset := int64(pgno-1) * int64(db.pageSize)
|
||||
if _, err := f.WriteAt(buf[WALFrameHeaderSize:], offset); err != nil {
|
||||
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
||||
}
|
||||
|
||||
// Truncate database, if commit specified.
|
||||
if commit != 0 {
|
||||
if err := f.Truncate(int64(commit) * int64(db.pageSize)); err != nil {
|
||||
return fmt.Errorf("truncate db: commit=%d err=%w", commit, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close WAL file writer.
|
||||
if err := ww.Close(); err != nil {
|
||||
return fmt.Errorf("close wal writer: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate WAL index.
|
||||
if err := invalidateSHMFile(db.path); err != nil {
|
||||
return fmt.Errorf("invalidate shm file: %w", err)
|
||||
// Close database file writer.
|
||||
if err := f.Close(); err != nil {
|
||||
return fmt.Errorf("close db writer: %w", err)
|
||||
}
|
||||
|
||||
// Write position to file so other processes can read it.
|
||||
@@ -2016,51 +2039,21 @@ func logPrefixPath(path string) string {
|
||||
return path
|
||||
}
|
||||
|
||||
// invalidateSHMFile clears the iVersion field of the -shm file in order that
|
||||
// the next transaction will rebuild it.
|
||||
func invalidateSHMFile(dbPath string) error {
|
||||
db, err := sql.Open("sqlite3", dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reopen db: %w", err)
|
||||
}
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
if _, err := db.Exec(`PRAGMA wal_checkpoint(PASSIVE)`); err != nil {
|
||||
return fmt.Errorf("passive checkpoint: %w", err)
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(dbPath+"-shm", os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open shm index: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
buf := make([]byte, WALIndexHeaderSize)
|
||||
if _, err := io.ReadFull(f, buf); err != nil {
|
||||
return fmt.Errorf("read shm index: %w", err)
|
||||
}
|
||||
|
||||
// Invalidate "isInit" fields.
|
||||
buf[12], buf[60] = 0, 0
|
||||
|
||||
// Rewrite header.
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("seek shm index: %w", err)
|
||||
} else if _, err := f.Write(buf); err != nil {
|
||||
return fmt.Errorf("overwrite shm index: %w", err)
|
||||
} else if err := f.Close(); err != nil {
|
||||
return fmt.Errorf("close shm index: %w", err)
|
||||
}
|
||||
|
||||
// Truncate WAL file again.
|
||||
var row [3]int
|
||||
if err := db.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE)`).Scan(&row[0], &row[1], &row[2]); err != nil {
|
||||
return fmt.Errorf("truncate: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// A marker error to indicate that a restart checkpoint could not verify
|
||||
// continuity between WAL indices and a new generation should be started.
|
||||
var errRestartGeneration = errors.New("restart generation")
|
||||
|
||||
const (
|
||||
WAL_WRITE_LOCK_OFFSET = 120
|
||||
WAL_CKPT_LOCK_OFFSET = 121
|
||||
)
|
||||
|
||||
// setLkw is a helper function for calling fcntl for file locking.
|
||||
func setLkw(f *os.File, typ int16, start, len int64) error {
|
||||
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &syscall.Flock_t{
|
||||
Start: start,
|
||||
Len: len,
|
||||
Type: typ,
|
||||
Whence: io.SeekStart,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ func (c *Client) Stream(ctx context.Context, pos litestream.Pos) (litestream.Str
|
||||
if !pos.IsZero() {
|
||||
q.Set("generation", pos.Generation)
|
||||
q.Set("index", litestream.FormatIndex(pos.Index))
|
||||
q.Set("offset", litestream.FormatOffset(pos.Offset))
|
||||
}
|
||||
|
||||
// Strip off everything but the scheme & host.
|
||||
|
||||
+21
-6
@@ -134,16 +134,31 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
generationStr := q.Get("generation")
|
||||
indexStr := q.Get("index")
|
||||
offsetStr := q.Get("offset")
|
||||
|
||||
// Parse current client position, if available.
|
||||
var pos litestream.Pos
|
||||
if generation, index := q.Get("generation"), q.Get("index"); generation != "" && index != "" {
|
||||
pos.Generation = generation
|
||||
if generationStr != "" && indexStr != "" && offsetStr != "" {
|
||||
|
||||
var err error
|
||||
if pos.Index, err = litestream.ParseIndex(index); err != nil {
|
||||
index, err := litestream.ParseIndex(indexStr)
|
||||
if err != nil {
|
||||
s.writeError(w, r, "Invalid index query parameter", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
offset, err := litestream.ParseOffset(offsetStr)
|
||||
if err != nil {
|
||||
s.writeError(w, r, "Invalid offset query parameter", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
pos = litestream.Pos{
|
||||
Generation: generationStr,
|
||||
Index: index,
|
||||
Offset: offset,
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch database instance from the primary server.
|
||||
@@ -162,7 +177,6 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
s.writeError(w, r, "No generation available", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
dbPos.Offset = 0
|
||||
|
||||
// Use database position if generation has changed.
|
||||
var snapshotRequired bool
|
||||
@@ -170,6 +184,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
s.Logger.Printf("stream generation mismatch, using primary position: client.pos=%s", pos)
|
||||
pos, snapshotRequired = dbPos, true
|
||||
}
|
||||
pos.Offset = 0
|
||||
|
||||
// Obtain iterator before snapshot so we don't miss any WAL segments.
|
||||
fitr, err := db.WALSegments(r.Context(), pos.Generation)
|
||||
@@ -289,7 +304,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
||||
// Flush after WAL segment has been written.
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
if bitr.Err() != nil {
|
||||
if err := bitr.Err(); err != nil {
|
||||
s.Logger.Printf("wal iterator error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -481,7 +481,7 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
} else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil {
|
||||
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY, DATA TEXT)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db0.Close()
|
||||
@@ -489,8 +489,8 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
|
||||
var index int
|
||||
insertAndWait := func() {
|
||||
index++
|
||||
t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", index)
|
||||
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, index); err != nil {
|
||||
t.Logf("[exec] INSERT INTO t (id, data) VALUES (%d, '...')", index)
|
||||
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id, data) VALUES (?, ?)`, index, strings.Repeat("x", 512)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
@@ -102,6 +102,24 @@ func TestTruncateDuration(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMD5Hash(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
input []byte
|
||||
output string
|
||||
}{
|
||||
{[]byte{}, "d41d8cd98f00b204e9800998ecf8427e"},
|
||||
{[]byte{0x0}, "93b885adfe0da089cdf634904fd59f71"},
|
||||
{[]byte{0x0, 0x1, 0x2, 0x3}, "37b59afd592725f9305e484a5d7f5168"},
|
||||
{[]byte("Hello, world!"), "6cd3556deb0da54bca060b4c39479839"},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("%v", tt.input), func(t *testing.T) {
|
||||
if got, want := internal.MD5Hash(tt.input), tt.output; got != want {
|
||||
t.Fatalf("hash=%s, want %s", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnceCloser(t *testing.T) {
|
||||
var closed bool
|
||||
var rc = &mock.ReadCloser{
|
||||
|
||||
@@ -234,6 +234,88 @@ func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, ma
|
||||
return min, max, nil
|
||||
}
|
||||
|
||||
// FindIndexByTimestamp returns the highest index before a given point-in-time
|
||||
// within a generation. Returns ErrNoSnapshots if no index exists on the replica
|
||||
// for the generation.
|
||||
func FindIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||
snapshotIndex, err := FindSnapshotIndexByTimestamp(ctx, client, generation, timestamp)
|
||||
if err == ErrNoSnapshots {
|
||||
return 0, err
|
||||
} else if err != nil {
|
||||
return 0, fmt.Errorf("max snapshot index: %w", err)
|
||||
}
|
||||
|
||||
// Determine the highest available WAL index.
|
||||
walIndex, err := FindWALIndexByTimestamp(ctx, client, generation, timestamp)
|
||||
if err != nil && err != ErrNoWALSegments {
|
||||
return 0, fmt.Errorf("max wal index: %w", err)
|
||||
}
|
||||
|
||||
// Use snapshot index if it's after the last WAL index.
|
||||
if snapshotIndex > walIndex {
|
||||
return snapshotIndex, nil
|
||||
}
|
||||
return walIndex, nil
|
||||
}
|
||||
|
||||
// FindSnapshotIndexByTimestamp returns the highest snapshot index before timestamp.
|
||||
// Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.
|
||||
func FindSnapshotIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||
itr, err := client.Snapshots(ctx, generation)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("snapshots: %w", err)
|
||||
}
|
||||
defer func() { _ = itr.Close() }()
|
||||
|
||||
// Iterate over snapshots to find the highest index.
|
||||
var n int
|
||||
for ; itr.Next(); n++ {
|
||||
if info := itr.Snapshot(); info.CreatedAt.After(timestamp) {
|
||||
continue
|
||||
} else if info.Index > index {
|
||||
index = info.Index
|
||||
}
|
||||
}
|
||||
if err := itr.Close(); err != nil {
|
||||
return 0, fmt.Errorf("snapshot iteration: %w", err)
|
||||
}
|
||||
|
||||
// Return an error if no snapshots were found.
|
||||
if n == 0 {
|
||||
return 0, ErrNoSnapshots
|
||||
}
|
||||
return index, nil
|
||||
}
|
||||
|
||||
// FindWALIndexByTimestamp returns the highest WAL index before timestamp.
|
||||
// Returns ErrNoWALSegments if no segments exist for the generation on the replica.
|
||||
func FindWALIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||
itr, err := client.WALSegments(ctx, generation)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("wal segments: %w", err)
|
||||
}
|
||||
defer func() { _ = itr.Close() }()
|
||||
|
||||
// Iterate over WAL segments to find the highest index.
|
||||
var n int
|
||||
for ; itr.Next(); n++ {
|
||||
if info := itr.WALSegment(); info.CreatedAt.After(timestamp) {
|
||||
continue
|
||||
} else if info.Index > index {
|
||||
index = info.Index
|
||||
}
|
||||
}
|
||||
if err := itr.Close(); err != nil {
|
||||
return 0, fmt.Errorf("wal segment iteration: %w", err)
|
||||
}
|
||||
|
||||
// Return an error if no WAL segments were found.
|
||||
if n == 0 {
|
||||
return 0, ErrNoWALSegments
|
||||
}
|
||||
return index, nil
|
||||
}
|
||||
|
||||
// FindMaxIndexByGeneration returns the last index within a generation.
|
||||
// Returns ErrNoSnapshots if no index exists on the replica for the generation.
|
||||
func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {
|
||||
|
||||
@@ -489,6 +489,167 @@ func TestFindMaxIndexByGeneration(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestFindSnapshotIndexByTimestamp(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "ok"))
|
||||
if index, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x000007d0; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrNoSnapshots", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "no-snapshots"))
|
||||
|
||||
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err != litestream.ErrNoSnapshots {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrSnapshots", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `snapshots: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrSnapshotIteration", func(t *testing.T) {
|
||||
var itr mock.SnapshotIterator
|
||||
itr.NextFunc = func() bool { return false }
|
||||
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
|
||||
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return &itr, nil
|
||||
}
|
||||
|
||||
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `snapshot iteration: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFindWALIndexByTimestamp(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "ok"))
|
||||
if index, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 1; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrNoWALSegments", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "no-wal"))
|
||||
|
||||
_, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err != litestream.ErrNoWALSegments {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrWALSegments", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `wal segments: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrWALSegmentIteration", func(t *testing.T) {
|
||||
var itr mock.WALSegmentIterator
|
||||
itr.NextFunc = func() bool { return false }
|
||||
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
|
||||
|
||||
var client mock.ReplicaClient
|
||||
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||
return &itr, nil
|
||||
}
|
||||
|
||||
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `wal segment iteration: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFindIndexByTimestamp(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "ok"))
|
||||
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 4, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x00000002; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("NoWAL", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-wal"))
|
||||
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x00000001; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("SnapshotLaterThanWAL", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "snapshot-later-than-wal"))
|
||||
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if got, want := index, 0x00000001; got != want {
|
||||
t.Fatalf("index=%d, want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrNoSnapshots", func(t *testing.T) {
|
||||
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-snapshots"))
|
||||
|
||||
_, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err != litestream.ErrNoSnapshots {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrSnapshots", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `max snapshot index: snapshots: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ErrWALSegments", func(t *testing.T) {
|
||||
var client mock.ReplicaClient
|
||||
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||
return litestream.NewSnapshotInfoSliceIterator([]litestream.SnapshotInfo{{Index: 0x00000001}}), nil
|
||||
}
|
||||
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||
return nil, fmt.Errorf("marker")
|
||||
}
|
||||
|
||||
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
if err == nil || err.Error() != `max wal index: wal segments: marker` {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRestore(t *testing.T) {
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
testDir := filepath.Join("testdata", "restore", "ok")
|
||||
|
||||
@@ -703,6 +703,13 @@ func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool)
|
||||
endpoint = net.JoinHostPort(endpoint, port)
|
||||
}
|
||||
|
||||
if e := os.Getenv("LITESTREAM_ENDPOINT"); e != "" {
|
||||
endpoint = e
|
||||
}
|
||||
if r := os.Getenv("LITESTREAM_REGION"); r != "" {
|
||||
region = r
|
||||
}
|
||||
|
||||
// Prepend scheme to endpoint.
|
||||
if endpoint != "" {
|
||||
endpoint = scheme + "://" + endpoint
|
||||
|
||||
Vendored
+5
@@ -1,8 +1,13 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
make -C find-latest-generation/ok
|
||||
make -C index-by-timestamp/no-wal
|
||||
make -C index-by-timestamp/ok
|
||||
make -C index-by-timestamp/snapshot-later-than-wal
|
||||
make -C generation-time-bounds/ok
|
||||
make -C generation-time-bounds/snapshots-only
|
||||
make -C replica-client-time-bounds/ok
|
||||
make -C snapshot-time-bounds/ok
|
||||
make -C snapshot-index-by-timestamp/ok
|
||||
make -C wal-time-bounds/ok
|
||||
make -C wal-index-by-timestamp/ok
|
||||
|
||||
+6
@@ -0,0 +1,6 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000002.snapshot.lz4
|
||||
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
+11
@@ -0,0 +1,11 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001040000 generations/0000000000000000/wal/0000000000000002/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001050000 generations/0000000000000000/wal/0000000000000003/0000000000000000.wal.lz4
|
||||
|
||||
Vendored
BIN
Binary file not shown.
Vendored
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
@@ -0,0 +1,7 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
@@ -0,0 +1,5 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/00000000000003e8.snapshot.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/00000000000007d0.snapshot.lz4
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
+6
@@ -0,0 +1,6 @@
|
||||
.PHONY: default
|
||||
default:
|
||||
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
|
||||
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Reference in New Issue
Block a user