Compare commits
8 Commits
v0.4.0-bet
...
fix-stream
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c53a09c124 | ||
|
|
46597ab22f | ||
|
|
e6f7c6052d | ||
|
|
7d8b8c6ec0 | ||
|
|
88737d7164 | ||
|
|
6763e9218c | ||
|
|
301e1172fd | ||
|
|
ca07137d32 |
7
.github/workflows/build_and_test.yml
vendored
7
.github/workflows/build_and_test.yml
vendored
@@ -22,4 +22,9 @@ jobs:
|
|||||||
run: go install ./cmd/litestream
|
run: go install ./cmd/litestream
|
||||||
|
|
||||||
- name: Run unit tests
|
- name: Run unit tests
|
||||||
run: make testdata && go test -v ./...
|
run: make testdata && go test -v --coverprofile=.coverage.out ./... && go tool cover -html .coverage.out -o .coverage.html
|
||||||
|
|
||||||
|
- uses: actions/upload-artifact@v3
|
||||||
|
with:
|
||||||
|
name: code-coverage
|
||||||
|
path: .coverage.html
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1,3 @@
|
|||||||
|
.coverage.*
|
||||||
.DS_Store
|
.DS_Store
|
||||||
/dist
|
/dist
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
)
|
)
|
||||||
@@ -22,14 +23,15 @@ type RestoreCommand struct {
|
|||||||
snapshotIndex int // index of snapshot to start from
|
snapshotIndex int // index of snapshot to start from
|
||||||
|
|
||||||
// CLI options
|
// CLI options
|
||||||
configPath string // path to config file
|
configPath string // path to config file
|
||||||
noExpandEnv bool // if true, do not expand env variables in config
|
noExpandEnv bool // if true, do not expand env variables in config
|
||||||
outputPath string // path to restore database to
|
outputPath string // path to restore database to
|
||||||
replicaName string // optional, name of replica to restore from
|
replicaName string // optional, name of replica to restore from
|
||||||
generation string // optional, generation to restore
|
generation string // optional, generation to restore
|
||||||
targetIndex int // optional, last WAL index to replay
|
targetIndex int // optional, last WAL index to replay
|
||||||
ifDBNotExists bool // if true, skips restore if output path already exists
|
timestamp time.Time // optional, restore to point-in-time (ISO 8601)
|
||||||
ifReplicaExists bool // if true, skips if no backups exist
|
ifDBNotExists bool // if true, skips restore if output path already exists
|
||||||
|
ifReplicaExists bool // if true, skips if no backups exist
|
||||||
opt litestream.RestoreOptions
|
opt litestream.RestoreOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,6 +55,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
fs.StringVar(&c.replicaName, "replica", "", "replica name")
|
fs.StringVar(&c.replicaName, "replica", "", "replica name")
|
||||||
fs.StringVar(&c.generation, "generation", "", "generation name")
|
fs.StringVar(&c.generation, "generation", "", "generation name")
|
||||||
fs.Var((*indexVar)(&c.targetIndex), "index", "wal index")
|
fs.Var((*indexVar)(&c.targetIndex), "index", "wal index")
|
||||||
|
timestampStr := fs.String("timestamp", "", "point-in-time restore (ISO 8601)")
|
||||||
fs.IntVar(&c.opt.Parallelism, "parallelism", c.opt.Parallelism, "parallelism")
|
fs.IntVar(&c.opt.Parallelism, "parallelism", c.opt.Parallelism, "parallelism")
|
||||||
fs.BoolVar(&c.ifDBNotExists, "if-db-not-exists", false, "")
|
fs.BoolVar(&c.ifDBNotExists, "if-db-not-exists", false, "")
|
||||||
fs.BoolVar(&c.ifReplicaExists, "if-replica-exists", false, "")
|
fs.BoolVar(&c.ifReplicaExists, "if-replica-exists", false, "")
|
||||||
@@ -66,9 +69,20 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
}
|
}
|
||||||
pathOrURL := fs.Arg(0)
|
pathOrURL := fs.Arg(0)
|
||||||
|
|
||||||
|
// Parse timestamp.
|
||||||
|
if *timestampStr != "" {
|
||||||
|
if c.timestamp, err = time.Parse(time.RFC3339Nano, *timestampStr); err != nil {
|
||||||
|
return fmt.Errorf("invalid -timestamp, expected ISO 8601: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure a generation is specified if target index is specified.
|
// Ensure a generation is specified if target index is specified.
|
||||||
if c.targetIndex != -1 && c.generation == "" {
|
if c.targetIndex != -1 && !c.timestamp.IsZero() {
|
||||||
|
return fmt.Errorf("cannot specify both -index flag and -timestamp flag")
|
||||||
|
} else if c.targetIndex != -1 && c.generation == "" {
|
||||||
return fmt.Errorf("must specify -generation flag when using -index flag")
|
return fmt.Errorf("must specify -generation flag when using -index flag")
|
||||||
|
} else if !c.timestamp.IsZero() && c.generation == "" {
|
||||||
|
return fmt.Errorf("must specify -generation flag when using -timestamp flag")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Default to original database path if output path not specified.
|
// Default to original database path if output path not specified.
|
||||||
@@ -117,7 +131,11 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Determine the maximum available index for the generation if one is not specified.
|
// Determine the maximum available index for the generation if one is not specified.
|
||||||
if c.targetIndex == -1 {
|
if !c.timestamp.IsZero() {
|
||||||
|
if c.targetIndex, err = litestream.FindIndexByTimestamp(ctx, r.Client(), c.generation, c.timestamp); err != nil {
|
||||||
|
return fmt.Errorf("cannot find index for timestamp in generation %q: %w", c.generation, err)
|
||||||
|
}
|
||||||
|
} else if c.targetIndex == -1 {
|
||||||
if c.targetIndex, err = litestream.FindMaxIndexByGeneration(ctx, r.Client(), c.generation); err != nil {
|
if c.targetIndex, err = litestream.FindMaxIndexByGeneration(ctx, r.Client(), c.generation); err != nil {
|
||||||
return fmt.Errorf("cannot determine latest index in generation %q: %w", c.generation, err)
|
return fmt.Errorf("cannot determine latest index in generation %q: %w", c.generation, err)
|
||||||
}
|
}
|
||||||
@@ -239,6 +257,10 @@ Arguments:
|
|||||||
Restore up to a specific hex-encoded WAL index (inclusive).
|
Restore up to a specific hex-encoded WAL index (inclusive).
|
||||||
Defaults to use the highest available index.
|
Defaults to use the highest available index.
|
||||||
|
|
||||||
|
-timestamp DATETIME
|
||||||
|
Restore up to a specific point-in-time. Must be ISO 8601.
|
||||||
|
Cannot be specified with -index flag.
|
||||||
|
|
||||||
-o PATH
|
-o PATH
|
||||||
Output path of the restored database.
|
Output path of the restored database.
|
||||||
Defaults to original DB path.
|
Defaults to original DB path.
|
||||||
@@ -253,9 +275,6 @@ Arguments:
|
|||||||
Determines the number of WAL files downloaded in parallel.
|
Determines the number of WAL files downloaded in parallel.
|
||||||
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
|
Defaults to `+strconv.Itoa(litestream.DefaultRestoreParallelism)+`.
|
||||||
|
|
||||||
-v
|
|
||||||
Verbose output.
|
|
||||||
|
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
|
|
||||||
@@ -271,6 +290,9 @@ Examples:
|
|||||||
# Restore database from specific generation on S3.
|
# Restore database from specific generation on S3.
|
||||||
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
|
$ litestream restore -replica s3 -generation xxxxxxxx /path/to/db
|
||||||
|
|
||||||
|
# Restore database to a specific point in time.
|
||||||
|
$ litestream restore -generation xxxxxxxx -timestamp 2000-01-01T00:00:00Z /path/to/db
|
||||||
|
|
||||||
`[1:],
|
`[1:],
|
||||||
DefaultConfigPath(),
|
DefaultConfigPath(),
|
||||||
)
|
)
|
||||||
|
|||||||
179
db.go
179
db.go
@@ -17,6 +17,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream/internal"
|
"github.com/benbjohnson/litestream/internal"
|
||||||
@@ -1747,23 +1748,34 @@ func (db *DB) stream(ctx context.Context) error {
|
|||||||
|
|
||||||
// streamSnapshot reads the snapshot into the WAL and applies it to the main database.
|
// streamSnapshot reads the snapshot into the WAL and applies it to the main database.
|
||||||
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
|
func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.Reader) error {
|
||||||
// Truncate WAL file.
|
|
||||||
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
|
|
||||||
return fmt.Errorf("truncate: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine total page count.
|
// Determine total page count.
|
||||||
pageN := int(hdr.Size / int64(db.pageSize))
|
pageN := int(hdr.Size / int64(db.pageSize))
|
||||||
|
|
||||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
// Open database file.
|
||||||
if err := ww.Open(); err != nil {
|
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
||||||
return fmt.Errorf("open wal writer: %w", err)
|
if err != nil {
|
||||||
|
return fmt.Errorf("open db file: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = ww.Close() }()
|
defer f.Close()
|
||||||
|
|
||||||
if err := ww.WriteHeader(); err != nil {
|
// Open shm file for locking.
|
||||||
return fmt.Errorf("write wal header: %w", err)
|
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open shm file: %w", err)
|
||||||
}
|
}
|
||||||
|
defer shmFile.Close()
|
||||||
|
|
||||||
|
// Obtain WAL checkpoint lock.
|
||||||
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
||||||
|
|
||||||
|
// Obtain WAL write lock.
|
||||||
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
||||||
|
|
||||||
// Iterate over pages
|
// Iterate over pages
|
||||||
buf := make([]byte, db.pageSize)
|
buf := make([]byte, db.pageSize)
|
||||||
@@ -1775,26 +1787,18 @@ func (db *DB) streamSnapshot(ctx context.Context, hdr *StreamRecordHeader, r io.
|
|||||||
return fmt.Errorf("read snapshot page %d: %w", pgno, err)
|
return fmt.Errorf("read snapshot page %d: %w", pgno, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Issue a commit flag when the last page is reached.
|
// Copy page to database file.
|
||||||
var commit uint32
|
offset := int64(pgno-1) * int64(db.pageSize)
|
||||||
|
if _, err := f.WriteAt(buf, offset); err != nil {
|
||||||
|
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Truncate database to final size.
|
||||||
if pgno == uint32(pageN) {
|
if pgno == uint32(pageN) {
|
||||||
commit = uint32(pageN)
|
if err := f.Truncate(int64(pageN) * int64(db.pageSize)); err != nil {
|
||||||
|
return fmt.Errorf("truncate db: commit=%d err=%w", pageN, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write page into WAL frame.
|
|
||||||
if err := ww.WriteFrame(pgno, commit, buf); err != nil {
|
|
||||||
return fmt.Errorf("write wal frame: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close WAL file writer.
|
|
||||||
if err := ww.Close(); err != nil {
|
|
||||||
return fmt.Errorf("close wal writer: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Invalidate WAL index.
|
|
||||||
if err := invalidateSHMFile(db.path); err != nil {
|
|
||||||
return fmt.Errorf("invalidate shm file: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write position to file so other processes can read it.
|
// Write position to file so other processes can read it.
|
||||||
@@ -1819,44 +1823,63 @@ func (db *DB) streamWALSegment(ctx context.Context, hdr *StreamRecordHeader, r i
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ww := NewWALWriter(db.WALPath(), db.fileMode, db.pageSize)
|
// Open database file.
|
||||||
if err := ww.Open(); err != nil {
|
f, err := os.OpenFile(db.path, os.O_RDWR, 0666)
|
||||||
return fmt.Errorf("open wal writer: %w", err)
|
if err != nil {
|
||||||
|
return fmt.Errorf("open db file: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = ww.Close() }()
|
defer f.Close()
|
||||||
|
|
||||||
if err := ww.WriteHeader(); err != nil {
|
// Open shm file for locking.
|
||||||
return fmt.Errorf("write wal header: %w", err)
|
shmFile, err := os.OpenFile(db.SHMPath(), os.O_RDWR, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open shm file: %w", err)
|
||||||
}
|
}
|
||||||
|
defer shmFile.Close()
|
||||||
|
|
||||||
|
// Obtain WAL checkpoint lock.
|
||||||
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_CKPT_LOCK_OFFSET, 1); err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain wal checkpoint lock: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_CKPT_LOCK_OFFSET, 1) }()
|
||||||
|
|
||||||
|
// Obtain WAL write lock.
|
||||||
|
if err := setLkw(shmFile, syscall.F_WRLCK, WAL_WRITE_LOCK_OFFSET, 1); err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain wal write lock: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = setLkw(shmFile, syscall.F_UNLCK, WAL_WRITE_LOCK_OFFSET, 1) }()
|
||||||
|
|
||||||
// Iterate over incoming WAL pages.
|
// Iterate over incoming WAL pages.
|
||||||
buf := make([]byte, WALFrameHeaderSize+db.pageSize)
|
buf := make([]byte, WALFrameHeaderSize+db.pageSize)
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
// Read snapshot page into a buffer.
|
// Read snapshot page into a buffer.
|
||||||
if _, err := io.ReadFull(zr, buf); err == io.EOF {
|
if n, err := io.ReadFull(zr, buf); err == io.EOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return fmt.Errorf("read wal frame %d: %w", i, err)
|
return fmt.Errorf("read wal frame: i=%d n=%d err=%w", i, n, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read page number & commit field.
|
// Read page number & commit field.
|
||||||
pgno := binary.BigEndian.Uint32(buf[0:])
|
pgno := binary.BigEndian.Uint32(buf[0:])
|
||||||
commit := binary.BigEndian.Uint32(buf[4:])
|
commit := binary.BigEndian.Uint32(buf[4:])
|
||||||
|
|
||||||
// Write page into WAL frame.
|
// Copy page to database file.
|
||||||
if err := ww.WriteFrame(pgno, commit, buf[WALFrameHeaderSize:]); err != nil {
|
offset := int64(pgno-1) * int64(db.pageSize)
|
||||||
return fmt.Errorf("write wal frame: %w", err)
|
if _, err := f.WriteAt(buf[WALFrameHeaderSize:], offset); err != nil {
|
||||||
|
return fmt.Errorf("copy to db: pgno=%d err=%w", pgno, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Truncate database, if commit specified.
|
||||||
|
if commit != 0 {
|
||||||
|
if err := f.Truncate(int64(commit) * int64(db.pageSize)); err != nil {
|
||||||
|
return fmt.Errorf("truncate db: commit=%d err=%w", commit, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close WAL file writer.
|
// Close database file writer.
|
||||||
if err := ww.Close(); err != nil {
|
if err := f.Close(); err != nil {
|
||||||
return fmt.Errorf("close wal writer: %w", err)
|
return fmt.Errorf("close db writer: %w", err)
|
||||||
}
|
|
||||||
|
|
||||||
// Invalidate WAL index.
|
|
||||||
if err := invalidateSHMFile(db.path); err != nil {
|
|
||||||
return fmt.Errorf("invalidate shm file: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write position to file so other processes can read it.
|
// Write position to file so other processes can read it.
|
||||||
@@ -2016,51 +2039,21 @@ func logPrefixPath(path string) string {
|
|||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
|
|
||||||
// invalidateSHMFile clears the iVersion field of the -shm file in order that
|
|
||||||
// the next transaction will rebuild it.
|
|
||||||
func invalidateSHMFile(dbPath string) error {
|
|
||||||
db, err := sql.Open("sqlite3", dbPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("reopen db: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = db.Close() }()
|
|
||||||
|
|
||||||
if _, err := db.Exec(`PRAGMA wal_checkpoint(PASSIVE)`); err != nil {
|
|
||||||
return fmt.Errorf("passive checkpoint: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := os.OpenFile(dbPath+"-shm", os.O_RDWR, 0666)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("open shm index: %w", err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
buf := make([]byte, WALIndexHeaderSize)
|
|
||||||
if _, err := io.ReadFull(f, buf); err != nil {
|
|
||||||
return fmt.Errorf("read shm index: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Invalidate "isInit" fields.
|
|
||||||
buf[12], buf[60] = 0, 0
|
|
||||||
|
|
||||||
// Rewrite header.
|
|
||||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
|
||||||
return fmt.Errorf("seek shm index: %w", err)
|
|
||||||
} else if _, err := f.Write(buf); err != nil {
|
|
||||||
return fmt.Errorf("overwrite shm index: %w", err)
|
|
||||||
} else if err := f.Close(); err != nil {
|
|
||||||
return fmt.Errorf("close shm index: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Truncate WAL file again.
|
|
||||||
var row [3]int
|
|
||||||
if err := db.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE)`).Scan(&row[0], &row[1], &row[2]); err != nil {
|
|
||||||
return fmt.Errorf("truncate: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// A marker error to indicate that a restart checkpoint could not verify
|
// A marker error to indicate that a restart checkpoint could not verify
|
||||||
// continuity between WAL indices and a new generation should be started.
|
// continuity between WAL indices and a new generation should be started.
|
||||||
var errRestartGeneration = errors.New("restart generation")
|
var errRestartGeneration = errors.New("restart generation")
|
||||||
|
|
||||||
|
const (
|
||||||
|
WAL_WRITE_LOCK_OFFSET = 120
|
||||||
|
WAL_CKPT_LOCK_OFFSET = 121
|
||||||
|
)
|
||||||
|
|
||||||
|
// setLkw is a helper function for calling fcntl for file locking.
|
||||||
|
func setLkw(f *os.File, typ int16, start, len int64) error {
|
||||||
|
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &syscall.Flock_t{
|
||||||
|
Start: start,
|
||||||
|
Len: len,
|
||||||
|
Type: typ,
|
||||||
|
Whence: io.SeekStart,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ func (c *Client) Stream(ctx context.Context, pos litestream.Pos) (litestream.Str
|
|||||||
if !pos.IsZero() {
|
if !pos.IsZero() {
|
||||||
q.Set("generation", pos.Generation)
|
q.Set("generation", pos.Generation)
|
||||||
q.Set("index", litestream.FormatIndex(pos.Index))
|
q.Set("index", litestream.FormatIndex(pos.Index))
|
||||||
|
q.Set("offset", litestream.FormatOffset(pos.Offset))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Strip off everything but the scheme & host.
|
// Strip off everything but the scheme & host.
|
||||||
|
|||||||
@@ -134,16 +134,31 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
generationStr := q.Get("generation")
|
||||||
|
indexStr := q.Get("index")
|
||||||
|
offsetStr := q.Get("offset")
|
||||||
|
|
||||||
// Parse current client position, if available.
|
// Parse current client position, if available.
|
||||||
var pos litestream.Pos
|
var pos litestream.Pos
|
||||||
if generation, index := q.Get("generation"), q.Get("index"); generation != "" && index != "" {
|
if generationStr != "" && indexStr != "" && offsetStr != "" {
|
||||||
pos.Generation = generation
|
|
||||||
|
|
||||||
var err error
|
index, err := litestream.ParseIndex(indexStr)
|
||||||
if pos.Index, err = litestream.ParseIndex(index); err != nil {
|
if err != nil {
|
||||||
s.writeError(w, r, "Invalid index query parameter", http.StatusBadRequest)
|
s.writeError(w, r, "Invalid index query parameter", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
offset, err := litestream.ParseOffset(offsetStr)
|
||||||
|
if err != nil {
|
||||||
|
s.writeError(w, r, "Invalid offset query parameter", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pos = litestream.Pos{
|
||||||
|
Generation: generationStr,
|
||||||
|
Index: index,
|
||||||
|
Offset: offset,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch database instance from the primary server.
|
// Fetch database instance from the primary server.
|
||||||
@@ -162,7 +177,6 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
|||||||
s.writeError(w, r, "No generation available", http.StatusServiceUnavailable)
|
s.writeError(w, r, "No generation available", http.StatusServiceUnavailable)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dbPos.Offset = 0
|
|
||||||
|
|
||||||
// Use database position if generation has changed.
|
// Use database position if generation has changed.
|
||||||
var snapshotRequired bool
|
var snapshotRequired bool
|
||||||
@@ -170,6 +184,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
|||||||
s.Logger.Printf("stream generation mismatch, using primary position: client.pos=%s", pos)
|
s.Logger.Printf("stream generation mismatch, using primary position: client.pos=%s", pos)
|
||||||
pos, snapshotRequired = dbPos, true
|
pos, snapshotRequired = dbPos, true
|
||||||
}
|
}
|
||||||
|
pos.Offset = 0
|
||||||
|
|
||||||
// Obtain iterator before snapshot so we don't miss any WAL segments.
|
// Obtain iterator before snapshot so we don't miss any WAL segments.
|
||||||
fitr, err := db.WALSegments(r.Context(), pos.Generation)
|
fitr, err := db.WALSegments(r.Context(), pos.Generation)
|
||||||
@@ -289,7 +304,7 @@ func (s *Server) handleGetStream(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Flush after WAL segment has been written.
|
// Flush after WAL segment has been written.
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
}
|
}
|
||||||
if bitr.Err() != nil {
|
if err := bitr.Err(); err != nil {
|
||||||
s.Logger.Printf("wal iterator error: %s", err)
|
s.Logger.Printf("wal iterator error: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -481,7 +481,7 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil {
|
} else if _, err := db0.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil {
|
} else if _, err := db0.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY, DATA TEXT)`); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer db0.Close()
|
defer db0.Close()
|
||||||
@@ -489,8 +489,8 @@ func TestCmd_Replicate_HTTP_PartialRecovery(t *testing.T) {
|
|||||||
var index int
|
var index int
|
||||||
insertAndWait := func() {
|
insertAndWait := func() {
|
||||||
index++
|
index++
|
||||||
t.Logf("[exec] INSERT INTO t (id) VALUES (%d)", index)
|
t.Logf("[exec] INSERT INTO t (id, data) VALUES (%d, '...')", index)
|
||||||
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id) VALUES (?)`, index); err != nil {
|
if _, err := db0.ExecContext(ctx, `INSERT INTO t (id, data) VALUES (?, ?)`, index, strings.Repeat("x", 512)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|||||||
@@ -102,6 +102,24 @@ func TestTruncateDuration(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMD5Hash(t *testing.T) {
|
||||||
|
for _, tt := range []struct {
|
||||||
|
input []byte
|
||||||
|
output string
|
||||||
|
}{
|
||||||
|
{[]byte{}, "d41d8cd98f00b204e9800998ecf8427e"},
|
||||||
|
{[]byte{0x0}, "93b885adfe0da089cdf634904fd59f71"},
|
||||||
|
{[]byte{0x0, 0x1, 0x2, 0x3}, "37b59afd592725f9305e484a5d7f5168"},
|
||||||
|
{[]byte("Hello, world!"), "6cd3556deb0da54bca060b4c39479839"},
|
||||||
|
} {
|
||||||
|
t.Run(fmt.Sprintf("%v", tt.input), func(t *testing.T) {
|
||||||
|
if got, want := internal.MD5Hash(tt.input), tt.output; got != want {
|
||||||
|
t.Fatalf("hash=%s, want %s", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestOnceCloser(t *testing.T) {
|
func TestOnceCloser(t *testing.T) {
|
||||||
var closed bool
|
var closed bool
|
||||||
var rc = &mock.ReadCloser{
|
var rc = &mock.ReadCloser{
|
||||||
|
|||||||
@@ -234,6 +234,88 @@ func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, ma
|
|||||||
return min, max, nil
|
return min, max, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindIndexByTimestamp returns the highest index before a given point-in-time
|
||||||
|
// within a generation. Returns ErrNoSnapshots if no index exists on the replica
|
||||||
|
// for the generation.
|
||||||
|
func FindIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||||
|
snapshotIndex, err := FindSnapshotIndexByTimestamp(ctx, client, generation, timestamp)
|
||||||
|
if err == ErrNoSnapshots {
|
||||||
|
return 0, err
|
||||||
|
} else if err != nil {
|
||||||
|
return 0, fmt.Errorf("max snapshot index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine the highest available WAL index.
|
||||||
|
walIndex, err := FindWALIndexByTimestamp(ctx, client, generation, timestamp)
|
||||||
|
if err != nil && err != ErrNoWALSegments {
|
||||||
|
return 0, fmt.Errorf("max wal index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use snapshot index if it's after the last WAL index.
|
||||||
|
if snapshotIndex > walIndex {
|
||||||
|
return snapshotIndex, nil
|
||||||
|
}
|
||||||
|
return walIndex, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSnapshotIndexByTimestamp returns the highest snapshot index before timestamp.
|
||||||
|
// Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.
|
||||||
|
func FindSnapshotIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||||
|
itr, err := client.Snapshots(ctx, generation)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("snapshots: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = itr.Close() }()
|
||||||
|
|
||||||
|
// Iterate over snapshots to find the highest index.
|
||||||
|
var n int
|
||||||
|
for ; itr.Next(); n++ {
|
||||||
|
if info := itr.Snapshot(); info.CreatedAt.After(timestamp) {
|
||||||
|
continue
|
||||||
|
} else if info.Index > index {
|
||||||
|
index = info.Index
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := itr.Close(); err != nil {
|
||||||
|
return 0, fmt.Errorf("snapshot iteration: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return an error if no snapshots were found.
|
||||||
|
if n == 0 {
|
||||||
|
return 0, ErrNoSnapshots
|
||||||
|
}
|
||||||
|
return index, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindWALIndexByTimestamp returns the highest WAL index before timestamp.
|
||||||
|
// Returns ErrNoWALSegments if no segments exist for the generation on the replica.
|
||||||
|
func FindWALIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
|
||||||
|
itr, err := client.WALSegments(ctx, generation)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("wal segments: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = itr.Close() }()
|
||||||
|
|
||||||
|
// Iterate over WAL segments to find the highest index.
|
||||||
|
var n int
|
||||||
|
for ; itr.Next(); n++ {
|
||||||
|
if info := itr.WALSegment(); info.CreatedAt.After(timestamp) {
|
||||||
|
continue
|
||||||
|
} else if info.Index > index {
|
||||||
|
index = info.Index
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := itr.Close(); err != nil {
|
||||||
|
return 0, fmt.Errorf("wal segment iteration: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return an error if no WAL segments were found.
|
||||||
|
if n == 0 {
|
||||||
|
return 0, ErrNoWALSegments
|
||||||
|
}
|
||||||
|
return index, nil
|
||||||
|
}
|
||||||
|
|
||||||
// FindMaxIndexByGeneration returns the last index within a generation.
|
// FindMaxIndexByGeneration returns the last index within a generation.
|
||||||
// Returns ErrNoSnapshots if no index exists on the replica for the generation.
|
// Returns ErrNoSnapshots if no index exists on the replica for the generation.
|
||||||
func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {
|
func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {
|
||||||
|
|||||||
@@ -489,6 +489,167 @@ func TestFindMaxIndexByGeneration(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFindSnapshotIndexByTimestamp(t *testing.T) {
|
||||||
|
t.Run("OK", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "ok"))
|
||||||
|
if index, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := index, 0x000007d0; got != want {
|
||||||
|
t.Fatalf("index=%d, want %d", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrNoSnapshots", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "snapshot-index-by-timestamp", "no-snapshots"))
|
||||||
|
|
||||||
|
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err != litestream.ErrNoSnapshots {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrSnapshots", func(t *testing.T) {
|
||||||
|
var client mock.ReplicaClient
|
||||||
|
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||||
|
return nil, fmt.Errorf("marker")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err == nil || err.Error() != `snapshots: marker` {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrSnapshotIteration", func(t *testing.T) {
|
||||||
|
var itr mock.SnapshotIterator
|
||||||
|
itr.NextFunc = func() bool { return false }
|
||||||
|
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
|
||||||
|
|
||||||
|
var client mock.ReplicaClient
|
||||||
|
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||||
|
return &itr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := litestream.FindSnapshotIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err == nil || err.Error() != `snapshot iteration: marker` {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindWALIndexByTimestamp(t *testing.T) {
|
||||||
|
t.Run("OK", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "ok"))
|
||||||
|
if index, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := index, 1; got != want {
|
||||||
|
t.Fatalf("index=%d, want %d", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrNoWALSegments", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "wal-index-by-timestamp", "no-wal"))
|
||||||
|
|
||||||
|
_, err := litestream.FindWALIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err != litestream.ErrNoWALSegments {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrWALSegments", func(t *testing.T) {
|
||||||
|
var client mock.ReplicaClient
|
||||||
|
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||||
|
return nil, fmt.Errorf("marker")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err == nil || err.Error() != `wal segments: marker` {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrWALSegmentIteration", func(t *testing.T) {
|
||||||
|
var itr mock.WALSegmentIterator
|
||||||
|
itr.NextFunc = func() bool { return false }
|
||||||
|
itr.CloseFunc = func() error { return fmt.Errorf("marker") }
|
||||||
|
|
||||||
|
var client mock.ReplicaClient
|
||||||
|
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||||
|
return &itr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := litestream.FindWALIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err == nil || err.Error() != `wal segment iteration: marker` {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindIndexByTimestamp(t *testing.T) {
|
||||||
|
t.Run("OK", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "ok"))
|
||||||
|
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 4, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := index, 0x00000002; got != want {
|
||||||
|
t.Fatalf("index=%d, want %d", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("NoWAL", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-wal"))
|
||||||
|
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := index, 0x00000001; got != want {
|
||||||
|
t.Fatalf("index=%d, want %d", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("SnapshotLaterThanWAL", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "snapshot-later-than-wal"))
|
||||||
|
if index, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got, want := index, 0x00000001; got != want {
|
||||||
|
t.Fatalf("index=%d, want %d", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrNoSnapshots", func(t *testing.T) {
|
||||||
|
client := litestream.NewFileReplicaClient(filepath.Join("testdata", "index-by-timestamp", "no-snapshots"))
|
||||||
|
|
||||||
|
_, err := litestream.FindIndexByTimestamp(context.Background(), client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err != litestream.ErrNoSnapshots {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrSnapshots", func(t *testing.T) {
|
||||||
|
var client mock.ReplicaClient
|
||||||
|
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||||
|
return nil, fmt.Errorf("marker")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err == nil || err.Error() != `max snapshot index: snapshots: marker` {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrWALSegments", func(t *testing.T) {
|
||||||
|
var client mock.ReplicaClient
|
||||||
|
client.SnapshotsFunc = func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) {
|
||||||
|
return litestream.NewSnapshotInfoSliceIterator([]litestream.SnapshotInfo{{Index: 0x00000001}}), nil
|
||||||
|
}
|
||||||
|
client.WALSegmentsFunc = func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) {
|
||||||
|
return nil, fmt.Errorf("marker")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := litestream.FindIndexByTimestamp(context.Background(), &client, "0000000000000000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||||
|
if err == nil || err.Error() != `max wal index: wal segments: marker` {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestRestore(t *testing.T) {
|
func TestRestore(t *testing.T) {
|
||||||
t.Run("OK", func(t *testing.T) {
|
t.Run("OK", func(t *testing.T) {
|
||||||
testDir := filepath.Join("testdata", "restore", "ok")
|
testDir := filepath.Join("testdata", "restore", "ok")
|
||||||
|
|||||||
@@ -703,6 +703,13 @@ func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool)
|
|||||||
endpoint = net.JoinHostPort(endpoint, port)
|
endpoint = net.JoinHostPort(endpoint, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if e := os.Getenv("LITESTREAM_ENDPOINT"); e != "" {
|
||||||
|
endpoint = e
|
||||||
|
}
|
||||||
|
if r := os.Getenv("LITESTREAM_REGION"); r != "" {
|
||||||
|
region = r
|
||||||
|
}
|
||||||
|
|
||||||
// Prepend scheme to endpoint.
|
// Prepend scheme to endpoint.
|
||||||
if endpoint != "" {
|
if endpoint != "" {
|
||||||
endpoint = scheme + "://" + endpoint
|
endpoint = scheme + "://" + endpoint
|
||||||
|
|||||||
5
testdata/Makefile
vendored
5
testdata/Makefile
vendored
@@ -1,8 +1,13 @@
|
|||||||
.PHONY: default
|
.PHONY: default
|
||||||
default:
|
default:
|
||||||
make -C find-latest-generation/ok
|
make -C find-latest-generation/ok
|
||||||
|
make -C index-by-timestamp/no-wal
|
||||||
|
make -C index-by-timestamp/ok
|
||||||
|
make -C index-by-timestamp/snapshot-later-than-wal
|
||||||
make -C generation-time-bounds/ok
|
make -C generation-time-bounds/ok
|
||||||
make -C generation-time-bounds/snapshots-only
|
make -C generation-time-bounds/snapshots-only
|
||||||
make -C replica-client-time-bounds/ok
|
make -C replica-client-time-bounds/ok
|
||||||
make -C snapshot-time-bounds/ok
|
make -C snapshot-time-bounds/ok
|
||||||
|
make -C snapshot-index-by-timestamp/ok
|
||||||
make -C wal-time-bounds/ok
|
make -C wal-time-bounds/ok
|
||||||
|
make -C wal-index-by-timestamp/ok
|
||||||
|
|||||||
0
testdata/index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
0
testdata/index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
6
testdata/index-by-timestamp/no-wal/Makefile
vendored
Normal file
6
testdata/index-by-timestamp/no-wal/Makefile
vendored
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
.PHONY: default
|
||||||
|
default:
|
||||||
|
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||||
|
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||||
|
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000002.snapshot.lz4
|
||||||
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
11
testdata/index-by-timestamp/ok/Makefile
vendored
Normal file
11
testdata/index-by-timestamp/ok/Makefile
vendored
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
.PHONY: default
|
||||||
|
default:
|
||||||
|
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||||
|
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||||
|
|
||||||
|
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||||
|
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||||
|
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
|
||||||
|
TZ=UTC touch -ct 200001040000 generations/0000000000000000/wal/0000000000000002/0000000000000000.wal.lz4
|
||||||
|
TZ=UTC touch -ct 200001050000 generations/0000000000000000/wal/0000000000000003/0000000000000000.wal.lz4
|
||||||
|
|
||||||
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
vendored
Normal file
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
vendored
Normal file
Binary file not shown.
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
vendored
Normal file
BIN
testdata/index-by-timestamp/ok/generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
vendored
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
7
testdata/index-by-timestamp/snapshot-later-than-wal/Makefile
vendored
Normal file
7
testdata/index-by-timestamp/snapshot-later-than-wal/Makefile
vendored
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
.PHONY: default
|
||||||
|
default:
|
||||||
|
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||||
|
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/0000000000000001.snapshot.lz4
|
||||||
|
|
||||||
|
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||||
|
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
0
testdata/snapshot-index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
0
testdata/snapshot-index-by-timestamp/no-snapshots/generations/0000000000000000/.gitignore
vendored
Normal file
5
testdata/snapshot-index-by-timestamp/ok/Makefile
vendored
Normal file
5
testdata/snapshot-index-by-timestamp/ok/Makefile
vendored
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
.PHONY: default
|
||||||
|
default:
|
||||||
|
TZ=UTC touch -ct 200001010000 generations/0000000000000000/snapshots/0000000000000000.snapshot.lz4
|
||||||
|
TZ=UTC touch -ct 200001020000 generations/0000000000000000/snapshots/00000000000003e8.snapshot.lz4
|
||||||
|
TZ=UTC touch -ct 200001030000 generations/0000000000000000/snapshots/00000000000007d0.snapshot.lz4
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
6
testdata/wal-index-by-timestamp/ok/Makefile
vendored
Normal file
6
testdata/wal-index-by-timestamp/ok/Makefile
vendored
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
.PHONY: default
|
||||||
|
default:
|
||||||
|
TZ=UTC touch -ct 200001010000 generations/0000000000000000/wal/0000000000000000/0000000000000000.wal.lz4
|
||||||
|
TZ=UTC touch -ct 200001020000 generations/0000000000000000/wal/0000000000000000/0000000000001234.wal.lz4
|
||||||
|
TZ=UTC touch -ct 200001030000 generations/0000000000000000/wal/0000000000000001/0000000000000000.wal.lz4
|
||||||
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user