Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37442babfb | ||
|
|
962a2a894b | ||
|
|
0c61c9f7fe | ||
|
|
267b140fab | ||
|
|
1b194535e6 | ||
|
|
58a6c765fe | ||
|
|
2604052a9f | ||
|
|
7f81890bae | ||
|
|
2ff073c735 | ||
|
|
6fd11ccab5 | ||
|
|
6c49fba592 | ||
|
|
922fa0798e | ||
|
|
976df182c0 | ||
|
|
0e28a650e6 | ||
|
|
f17768e830 | ||
|
|
2c142d3a0c | ||
|
|
4e469f8b02 | ||
|
|
3f268b70f8 | ||
|
|
ad7bf7f974 | ||
|
|
778451f09f | ||
|
|
8e9a15933b | ||
|
|
da1d7c3183 |
17
.github/CONTRIBUTING.md
vendored
Normal file
17
.github/CONTRIBUTING.md
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
## Open-source, not open-contribution
|
||||
|
||||
[Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
|
||||
source but closed to contributions. This keeps the code base free of proprietary
|
||||
or licensed code but it also helps me continue to maintain and build Litestream.
|
||||
|
||||
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
|
||||
accepting and maintaining third party patches contributed to my burn out and
|
||||
I eventually archived the project. Writing databases & low-level replication
|
||||
tools involves nuance and simple one line changes can have profound and
|
||||
unexpected changes in correctness and performance. Small contributions
|
||||
typically required hours of my time to properly test and validate them.
|
||||
|
||||
I am grateful for community involvement, bug reports, & feature requests. I do
|
||||
not wish to come off as anything but welcoming, however, I've
|
||||
made the decision to keep this project closed to contributions for my own
|
||||
mental health and long term viability of the project.
|
||||
14
README.md
14
README.md
@@ -22,7 +22,7 @@ GitHub.
|
||||
|
||||
### Mac OS (Homebrew)
|
||||
|
||||
To install from homebrew, first add the Litestream tap and then install:
|
||||
To install from homebrew, run the following command:
|
||||
|
||||
```sh
|
||||
$ brew install benbjohnson/litestream/litestream
|
||||
@@ -67,7 +67,7 @@ The `litestream` binary should be in your `$GOPATH/bin` folder.
|
||||
|
||||
Litestream provides a configuration file that can be used for production
|
||||
deployments but you can also specify a single database and replica on the
|
||||
command line for testing.
|
||||
command line when trying it out.
|
||||
|
||||
First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this
|
||||
example. You'll also need to set your AWS credentials:
|
||||
@@ -77,7 +77,7 @@ $ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx
|
||||
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
|
||||
```
|
||||
|
||||
Next you can run the `litestream replicate` command with the path to the
|
||||
Next, you can run the `litestream replicate` command with the path to the
|
||||
database you want to backup and the URL of your replica destination:
|
||||
|
||||
```sh
|
||||
@@ -89,7 +89,7 @@ to S3 every 10 seconds. From another terminal window, you can restore your
|
||||
database from your S3 replica:
|
||||
|
||||
```
|
||||
$ litestream restore -v -o /path/to/restored/db s3://mybkt/db
|
||||
$ litestream restore -o /path/to/restored/db s3://mybkt/db
|
||||
```
|
||||
|
||||
Voila! 🎉
|
||||
@@ -240,7 +240,7 @@ $ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db
|
||||
```
|
||||
|
||||
Point-in-time restores only have the resolution of the timestamp of the WAL file
|
||||
itself. By default, litestream will start a new WAL file every minute so
|
||||
itself. By default, Litestream will start a new WAL file every minute so
|
||||
point-in-time restores are only accurate to the minute.
|
||||
|
||||
|
||||
@@ -276,9 +276,9 @@ another process is allowed to checkpoint the WAL.
|
||||
|
||||
## Open-source, not open-contribution
|
||||
|
||||
[Similar to SQLite](https://www.sqlite.org/copyright.html), litestream is open
|
||||
[Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
|
||||
source but closed to contributions. This keeps the code base free of proprietary
|
||||
or licensed code but it also helps me continue to maintain and build litestream.
|
||||
or licensed code but it also helps me continue to maintain and build Litestream.
|
||||
|
||||
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
|
||||
accepting and maintaining third party patches contributed to my burn out and
|
||||
|
||||
@@ -35,7 +35,9 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
|
||||
// List all databases.
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
defer w.Flush()
|
||||
|
||||
fmt.Fprintln(w, "path\treplicas")
|
||||
for _, dbConfig := range config.DBs {
|
||||
db, err := newDBFromConfig(&config, dbConfig)
|
||||
@@ -53,7 +55,6 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
strings.Join(replicaNames, ","),
|
||||
)
|
||||
}
|
||||
w.Flush()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -77,7 +77,9 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
||||
}
|
||||
|
||||
// List each generation.
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
defer w.Flush()
|
||||
|
||||
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
|
||||
for _, r := range replicas {
|
||||
generations, err := r.Generations(ctx)
|
||||
@@ -101,10 +103,8 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
||||
stats.CreatedAt.Format(time.RFC3339),
|
||||
stats.UpdatedAt.Format(time.RFC3339),
|
||||
)
|
||||
w.Flush()
|
||||
}
|
||||
}
|
||||
w.Flush()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/litestream"
|
||||
"github.com/benbjohnson/litestream/s3"
|
||||
@@ -29,7 +30,7 @@ type ReplicateCommand struct {
|
||||
// Run loads all databases specified in the configuration.
|
||||
func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
|
||||
verbose := fs.Bool("v", false, "verbose logging")
|
||||
tracePath := fs.String("trace", "", "trace path")
|
||||
registerConfigFlag(fs, &c.ConfigPath)
|
||||
fs.Usage = c.Usage
|
||||
if err := fs.Parse(args); err != nil {
|
||||
@@ -43,7 +44,10 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
} else if fs.NArg() > 1 {
|
||||
dbConfig := &DBConfig{Path: fs.Arg(0)}
|
||||
for _, u := range fs.Args()[1:] {
|
||||
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{URL: u})
|
||||
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{
|
||||
URL: u,
|
||||
SyncInterval: 1 * time.Second,
|
||||
})
|
||||
}
|
||||
config.DBs = []*DBConfig{dbConfig}
|
||||
} else if c.ConfigPath != "" {
|
||||
@@ -56,8 +60,13 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
|
||||
// Enable trace logging.
|
||||
if *verbose {
|
||||
litestream.Tracef = log.Printf
|
||||
if *tracePath != "" {
|
||||
f, err := os.Create(*tracePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
litestream.Tracef = log.New(f, "", log.LstdFlags|log.LUTC|log.Lshortfile).Printf
|
||||
}
|
||||
|
||||
// Setup signal handler.
|
||||
@@ -159,8 +168,8 @@ Arguments:
|
||||
Specifies the configuration file.
|
||||
Defaults to %s
|
||||
|
||||
-v
|
||||
Enable verbose logging output.
|
||||
-trace PATH
|
||||
Write verbose trace logging to PATH.
|
||||
|
||||
`[1:], DefaultConfigPath())
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ type RestoreCommand struct{}
|
||||
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
var configPath string
|
||||
opt := litestream.NewRestoreOptions()
|
||||
opt.Verbose = true
|
||||
|
||||
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
|
||||
registerConfigFlag(fs, &configPath)
|
||||
fs.StringVar(&opt.OutputPath, "o", "", "output path")
|
||||
|
||||
@@ -75,7 +75,9 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
|
||||
// List all snapshots.
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
defer w.Flush()
|
||||
|
||||
fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated")
|
||||
for _, info := range infos {
|
||||
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n",
|
||||
@@ -86,7 +88,6 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
info.CreatedAt.Format(time.RFC3339),
|
||||
)
|
||||
}
|
||||
w.Flush()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -76,7 +76,9 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
|
||||
// List all WAL files.
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
defer w.Flush()
|
||||
|
||||
fmt.Fprintln(w, "replica\tgeneration\tindex\toffset\tsize\tcreated")
|
||||
for _, info := range infos {
|
||||
if *generation != "" && info.Generation != *generation {
|
||||
@@ -92,7 +94,6 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
||||
info.CreatedAt.Format(time.RFC3339),
|
||||
)
|
||||
}
|
||||
w.Flush()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
126
db.go
126
db.go
@@ -33,6 +33,10 @@ const (
|
||||
DefaultMaxCheckpointPageN = 10000
|
||||
)
|
||||
|
||||
// MaxIndex is the maximum possible WAL index.
|
||||
// If this index is reached then a new generation will be started.
|
||||
const MaxIndex = 0x7FFFFFFF
|
||||
|
||||
// BusyTimeout is the timeout to wait for EBUSY from SQLite.
|
||||
const BusyTimeout = 1 * time.Second
|
||||
|
||||
@@ -421,7 +425,7 @@ func (db *DB) init() (err error) {
|
||||
|
||||
// If we have an existing shadow WAL, ensure the headers match.
|
||||
if err := db.verifyHeadersMatch(); err != nil {
|
||||
log.Printf("%s: cannot determine last wal position, clearing generation (%s)", db.path, err)
|
||||
log.Printf("%s: init: cannot determine last wal position, clearing generation (%s)", db.path, err)
|
||||
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("remove generation name: %w", err)
|
||||
}
|
||||
@@ -736,7 +740,7 @@ func (db *DB) Sync() (err error) {
|
||||
if info.generation, err = db.createGeneration(); err != nil {
|
||||
return fmt.Errorf("create generation: %w", err)
|
||||
}
|
||||
log.Printf("%s: new generation %q, %s", db.path, info.generation, info.reason)
|
||||
log.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
|
||||
|
||||
// Clear shadow wal info.
|
||||
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
|
||||
@@ -844,10 +848,14 @@ func (db *DB) verify() (info syncInfo, err error) {
|
||||
db.walSizeGauge.Set(float64(fi.Size()))
|
||||
|
||||
// Open shadow WAL to copy append to.
|
||||
info.shadowWALPath, err = db.CurrentShadowWALPath(info.generation)
|
||||
index, _, err := db.CurrentShadowWALIndex(info.generation)
|
||||
if err != nil {
|
||||
return info, fmt.Errorf("cannot determine shadow WAL: %w", err)
|
||||
return info, fmt.Errorf("cannot determine shadow WAL index: %w", err)
|
||||
} else if index >= MaxIndex {
|
||||
info.reason = "max index exceeded"
|
||||
return info, nil
|
||||
}
|
||||
info.shadowWALPath = db.ShadowWALPath(generation, index)
|
||||
|
||||
// Determine shadow WAL current size.
|
||||
fi, err = os.Stat(info.shadowWALPath)
|
||||
@@ -1017,65 +1025,65 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
||||
return 0, fmt.Errorf("last checksum: %w", err)
|
||||
}
|
||||
|
||||
// Seek to correct position on both files.
|
||||
// Seek to correct position on real wal.
|
||||
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("wal seek: %w", err)
|
||||
return 0, fmt.Errorf("real wal seek: %w", err)
|
||||
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
||||
}
|
||||
|
||||
// Read through WAL from last position to find the page of the last
|
||||
// committed transaction.
|
||||
tmpSz := origSize
|
||||
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
|
||||
var buf bytes.Buffer
|
||||
offset := origSize
|
||||
lastCommitSize := origSize
|
||||
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
|
||||
for {
|
||||
Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz)
|
||||
|
||||
// Read next page from WAL file.
|
||||
if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
Tracef("%s: copy-shadow: break %s", db.path, err)
|
||||
if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err)
|
||||
break // end of file or partial page
|
||||
} else if err != nil {
|
||||
return 0, fmt.Errorf("read wal: %w", err)
|
||||
}
|
||||
|
||||
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
||||
salt0 := binary.BigEndian.Uint32(buf[8:])
|
||||
salt1 := binary.BigEndian.Uint32(buf[12:])
|
||||
salt0 := binary.BigEndian.Uint32(frame[8:])
|
||||
salt1 := binary.BigEndian.Uint32(frame[12:])
|
||||
if salt0 != hsalt0 || salt1 != hsalt1 {
|
||||
Tracef("%s: copy-shadow: break: salt mismatch", db.path)
|
||||
break
|
||||
}
|
||||
|
||||
// Verify checksum of page is valid.
|
||||
fchksum0 := binary.BigEndian.Uint32(buf[16:])
|
||||
fchksum1 := binary.BigEndian.Uint32(buf[20:])
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data
|
||||
fchksum0 := binary.BigEndian.Uint32(frame[16:])
|
||||
fchksum1 := binary.BigEndian.Uint32(frame[20:])
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
|
||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
|
||||
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
||||
return 0, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1)
|
||||
log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", offset, chksum0, chksum1, fchksum0, fchksum1)
|
||||
break
|
||||
}
|
||||
|
||||
// Add page to the new size of the shadow WAL.
|
||||
tmpSz += int64(len(buf))
|
||||
buf.Write(frame)
|
||||
|
||||
// Mark commit record.
|
||||
newDBSize := binary.BigEndian.Uint32(buf[4:])
|
||||
Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
|
||||
offset += int64(len(frame))
|
||||
|
||||
// Flush to shadow WAL if commit record.
|
||||
newDBSize := binary.BigEndian.Uint32(frame[4:])
|
||||
if newDBSize != 0 {
|
||||
lastCommitSize = tmpSz
|
||||
if _, err := buf.WriteTo(w); err != nil {
|
||||
return 0, fmt.Errorf("write shadow wal: %w", err)
|
||||
}
|
||||
buf.Reset()
|
||||
lastCommitSize = offset
|
||||
}
|
||||
}
|
||||
|
||||
// Seek to correct position on both files.
|
||||
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("wal seek: %w", err)
|
||||
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
||||
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
||||
}
|
||||
|
||||
// Copy bytes, sync & close.
|
||||
if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil {
|
||||
return 0, err
|
||||
} else if err := w.Sync(); err != nil {
|
||||
// Sync & close.
|
||||
if err := w.Sync(); err != nil {
|
||||
return 0, err
|
||||
} else if err := w.Close(); err != nil {
|
||||
return 0, err
|
||||
@@ -1098,6 +1106,8 @@ func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
|
||||
return nil, err
|
||||
} else if r.N() > 0 {
|
||||
return r, nil
|
||||
} else if err := r.Close(); err != nil { // no data, close, try next
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Otherwise attempt to read the start of the next WAL file.
|
||||
@@ -1171,6 +1181,9 @@ type ShadowWALReader struct {
|
||||
pos Pos
|
||||
}
|
||||
|
||||
// Name returns the filename of the underlying file.
|
||||
func (r *ShadowWALReader) Name() string { return r.f.Name() }
|
||||
|
||||
// Close closes the underlying WAL file handle.
|
||||
func (r *ShadowWALReader) Close() error { return r.f.Close() }
|
||||
|
||||
@@ -1264,7 +1277,7 @@ func (db *DB) checkpoint(mode string) (err error) {
|
||||
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
|
||||
Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
|
||||
|
||||
// Reacquire the read lock immediately after the checkpoint.
|
||||
if err := db.acquireReadLock(); err != nil {
|
||||
@@ -1288,6 +1301,11 @@ func (db *DB) checkpointAndInit(generation, mode string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy shadow WAL before checkpoint to copy as much as possible.
|
||||
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
||||
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
|
||||
}
|
||||
|
||||
// Execute checkpoint and immediately issue a write to the WAL to ensure
|
||||
// a new page is written.
|
||||
if err := db.checkpoint(mode); err != nil {
|
||||
@@ -1414,8 +1432,11 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error {
|
||||
return fmt.Errorf("cannot restore wal: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if opt.Verbose {
|
||||
logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
|
||||
}
|
||||
}
|
||||
|
||||
// Copy file to final location.
|
||||
logger.Printf("%s: renaming database from temporary location", logPrefix)
|
||||
@@ -1587,13 +1608,14 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
|
||||
}
|
||||
defer d.Close()
|
||||
|
||||
if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
|
||||
return err
|
||||
} else if err := d.Close(); err != nil {
|
||||
var row [3]int
|
||||
if err := d.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE);`).Scan(&row[0], &row[1], &row[2]); err != nil {
|
||||
return err
|
||||
} else if row[0] != 0 {
|
||||
return fmt.Errorf("truncation checkpoint failed during restore (%d,%d,%d)", row[0], row[1], row[2])
|
||||
}
|
||||
|
||||
return nil
|
||||
return d.Close()
|
||||
}
|
||||
|
||||
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
|
||||
@@ -1601,6 +1623,8 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
|
||||
// This function obtains a read lock so it prevents syncs from occurring until
|
||||
// the operation is complete. The database will still be usable but it will be
|
||||
// unable to checkpoint during this time.
|
||||
//
|
||||
// If dst is set, the database file is copied to that location before checksum.
|
||||
func (db *DB) CRC64() (uint64, Pos, error) {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
@@ -1664,8 +1688,9 @@ type RestoreOptions struct {
|
||||
// Only equivalent log output for a regular restore.
|
||||
DryRun bool
|
||||
|
||||
// Logger used to print status to.
|
||||
// Logging settings.
|
||||
Logger *log.Logger
|
||||
Verbose bool
|
||||
}
|
||||
|
||||
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
|
||||
@@ -1766,3 +1791,24 @@ func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
|
||||
return nil, fmt.Errorf("invalid wal header magic: %x", magic)
|
||||
}
|
||||
}
|
||||
|
||||
func copyFile(dst, src string) error {
|
||||
r, err := os.Open(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
w, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
if _, err := io.Copy(w, r); err != nil {
|
||||
return err
|
||||
} else if err := w.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -95,9 +95,9 @@ type Pos struct {
|
||||
// String returns a string representation.
|
||||
func (p Pos) String() string {
|
||||
if p.IsZero() {
|
||||
return "<>"
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset)
|
||||
return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset)
|
||||
}
|
||||
|
||||
// IsZero returns true if p is the zero value.
|
||||
|
||||
@@ -4,7 +4,6 @@ package litestream
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// fileinfo returns syscall fields from a FileInfo object.
|
||||
@@ -15,7 +14,7 @@ func fileinfo(fi os.FileInfo) (uid, gid int) {
|
||||
// fixRootDirectory is copied from the standard library for use with mkdirAll()
|
||||
func fixRootDirectory(p string) string {
|
||||
if len(p) == len(`\\?\c:`) {
|
||||
if IsPathSeparator(p[0]) && IsPathSeparator(p[1]) && p[2] == '?' && IsPathSeparator(p[3]) && p[5] == ':' {
|
||||
if os.IsPathSeparator(p[0]) && os.IsPathSeparator(p[1]) && p[2] == '?' && os.IsPathSeparator(p[3]) && p[5] == ':' {
|
||||
return p + `\`
|
||||
}
|
||||
}
|
||||
|
||||
132
replica.go
132
replica.go
@@ -2,6 +2,7 @@ package litestream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@@ -417,7 +418,7 @@ func (r *FileReplica) Stop() {
|
||||
func (r *FileReplica) monitor(ctx context.Context) {
|
||||
// Clear old temporary files that my have been left from a crash.
|
||||
if err := removeTmpFiles(r.dst); err != nil {
|
||||
log.Printf("%s(%s): cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): monitor: cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
|
||||
}
|
||||
|
||||
// Continuously check for new data to replicate.
|
||||
@@ -437,7 +438,7 @@ func (r *FileReplica) monitor(ctx context.Context) {
|
||||
|
||||
// Synchronize the shadow wal into the replication directory.
|
||||
if err := r.Sync(ctx); err != nil {
|
||||
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -454,7 +455,7 @@ func (r *FileReplica) retainer(ctx context.Context) {
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := r.EnforceRetention(ctx); err != nil {
|
||||
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -548,11 +549,16 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
|
||||
return nil
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
|
||||
return err
|
||||
} else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid)
|
||||
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
|
||||
return nil
|
||||
}
|
||||
|
||||
// snapshotN returns the number of snapshots for a generation.
|
||||
@@ -593,6 +599,8 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
|
||||
}
|
||||
generation := dpos.Generation
|
||||
|
||||
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
|
||||
|
||||
// Create snapshot if no snapshots exist for generation.
|
||||
if n, err := r.snapshotN(generation); err != nil {
|
||||
return err
|
||||
@@ -612,6 +620,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
|
||||
return fmt.Errorf("cannot determine replica position: %s", err)
|
||||
}
|
||||
|
||||
Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos)
|
||||
r.mu.Lock()
|
||||
r.pos = pos
|
||||
r.mu.Unlock()
|
||||
@@ -664,11 +673,48 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
n, err := io.Copy(w, rd)
|
||||
r.walBytesCounter.Add(float64(n))
|
||||
// Copy header if at offset zero.
|
||||
var psalt uint64 // previous salt value
|
||||
if pos := rd.Pos(); pos.Offset == 0 {
|
||||
buf := make([]byte, WALHeaderSize)
|
||||
if _, err := io.ReadFull(rd, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
psalt = binary.BigEndian.Uint64(buf[16:24])
|
||||
|
||||
n, err := w.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.walBytesCounter.Add(float64(n))
|
||||
}
|
||||
|
||||
// Copy frames.
|
||||
for {
|
||||
pos := rd.Pos()
|
||||
assert(pos.Offset == frameAlign(pos.Offset, r.db.pageSize), "shadow wal reader not frame aligned")
|
||||
|
||||
buf := make([]byte, WALFrameHeaderSize+r.db.pageSize)
|
||||
if _, err := io.ReadFull(rd, buf); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Verify salt matches the previous frame/header read.
|
||||
salt := binary.BigEndian.Uint64(buf[8:16])
|
||||
if psalt != 0 && psalt != salt {
|
||||
return fmt.Errorf("replica salt mismatch: %s", filepath.Base(filename))
|
||||
}
|
||||
psalt = salt
|
||||
|
||||
n, err := w.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.walBytesCounter.Add(float64(n))
|
||||
}
|
||||
|
||||
if err := w.Sync(); err != nil {
|
||||
return err
|
||||
@@ -796,7 +842,6 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
|
||||
|
||||
// If no retained snapshots exist, create a new snapshot.
|
||||
if len(snapshots) == 0 {
|
||||
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
|
||||
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
|
||||
return fmt.Errorf("cannot snapshot: %w", err)
|
||||
}
|
||||
@@ -814,7 +859,7 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
|
||||
|
||||
// Delete generations if it has no snapshots being retained.
|
||||
if snapshot == nil {
|
||||
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
|
||||
log.Printf("%s(%s): retainer: deleting generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
|
||||
if err := os.RemoveAll(r.GenerationDir(generation)); err != nil {
|
||||
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
|
||||
}
|
||||
@@ -843,6 +888,7 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
|
||||
return err
|
||||
}
|
||||
|
||||
var n int
|
||||
for _, fi := range fis {
|
||||
idx, _, err := ParseSnapshotPath(fi.Name())
|
||||
if err != nil {
|
||||
@@ -851,10 +897,13 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name())
|
||||
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
||||
return err
|
||||
}
|
||||
n++
|
||||
}
|
||||
if n > 0 {
|
||||
log.Printf("%s(%s): retainer: deleting snapshots before %s/%08x; n=%d", r.db.Path(), r.Name(), generation, index, n)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -871,6 +920,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
|
||||
return err
|
||||
}
|
||||
|
||||
var n int
|
||||
for _, fi := range fis {
|
||||
idx, _, _, err := ParseWALPath(fi.Name())
|
||||
if err != nil {
|
||||
@@ -879,10 +929,13 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
|
||||
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
|
||||
return err
|
||||
}
|
||||
n++
|
||||
}
|
||||
if n > 0 {
|
||||
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -991,20 +1044,6 @@ func compressFile(src, dst string, uid, gid int) error {
|
||||
func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
db := r.DB()
|
||||
|
||||
// Compute checksum of primary database under lock. This prevents a
|
||||
// sync from occurring and the database will not be written.
|
||||
chksum0, pos, err := db.CRC64()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||
}
|
||||
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos)
|
||||
|
||||
// Wait until replica catches up to position.
|
||||
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
|
||||
if err := waitForReplica(ctx, r, pos); err != nil {
|
||||
return fmt.Errorf("cannot wait for replica: %w", err)
|
||||
}
|
||||
|
||||
// Restore replica to a temporary directory.
|
||||
tmpdir, err := ioutil.TempDir("", "*-litestream")
|
||||
if err != nil {
|
||||
@@ -1012,7 +1051,20 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
restorePath := filepath.Join(tmpdir, "db")
|
||||
// Compute checksum of primary database under lock. This prevents a
|
||||
// sync from occurring and the database will not be written.
|
||||
primaryPath := filepath.Join(tmpdir, "primary")
|
||||
chksum0, pos, err := db.CRC64()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot compute checksum: %w", err)
|
||||
}
|
||||
|
||||
// Wait until replica catches up to position.
|
||||
if err := waitForReplica(ctx, r, pos); err != nil {
|
||||
return fmt.Errorf("cannot wait for replica: %w", err)
|
||||
}
|
||||
|
||||
restorePath := filepath.Join(tmpdir, "replica")
|
||||
if err := RestoreReplica(ctx, r, RestoreOptions{
|
||||
OutputPath: restorePath,
|
||||
ReplicaName: r.Name(),
|
||||
@@ -1029,17 +1081,33 @@ func ValidateReplica(ctx context.Context, r Replica) error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1)
|
||||
status := "ok"
|
||||
mismatch := chksum0 != chksum1
|
||||
if mismatch {
|
||||
status = "mismatch"
|
||||
}
|
||||
log.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
|
||||
|
||||
// Validate checksums match.
|
||||
if chksum0 != chksum1 {
|
||||
if mismatch {
|
||||
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
|
||||
|
||||
// Compress mismatched databases and report temporary path for investigation.
|
||||
if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil {
|
||||
return fmt.Errorf("cannot compress primary db: %w", err)
|
||||
} else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil {
|
||||
return fmt.Errorf("cannot compress replica db: %w", err)
|
||||
}
|
||||
log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir)
|
||||
|
||||
return ErrChecksumMismatch
|
||||
}
|
||||
|
||||
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
|
||||
log.Printf("%s(%s): replica ok", db.Path(), r.Name())
|
||||
|
||||
if err := os.RemoveAll(tmpdir); err != nil {
|
||||
return fmt.Errorf("cannot remove temporary validation directory: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1050,6 +1118,9 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
timer := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
once := make(chan struct{}, 1)
|
||||
once <- struct{}{}
|
||||
|
||||
@@ -1057,6 +1128,8 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
return fmt.Errorf("replica wait exceeded timeout")
|
||||
case <-ticker.C:
|
||||
case <-once: // immediate on first check
|
||||
}
|
||||
@@ -1064,7 +1137,7 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
// Obtain current position of replica, check if past target position.
|
||||
curr, err := r.CalcPos(ctx, pos.Generation)
|
||||
if err != nil {
|
||||
log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): validator: cannot obtain replica position: %s", db.Path(), r.Name(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1083,7 +1156,6 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
|
||||
|
||||
// If not ready, restart loop.
|
||||
if !ready {
|
||||
log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
15
s3/s3.go
15
s3/s3.go
@@ -467,7 +467,7 @@ func (r *Replica) monitor(ctx context.Context) {
|
||||
|
||||
// Synchronize the shadow wal into the replication directory.
|
||||
if err := r.Sync(ctx); err != nil {
|
||||
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
|
||||
log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -603,6 +603,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
|
||||
}()
|
||||
|
||||
snapshotPath := r.SnapshotPath(generation, index)
|
||||
startTime := time.Now()
|
||||
|
||||
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
||||
Bucket: aws.String(r.Bucket),
|
||||
@@ -615,6 +616,8 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
|
||||
r.putOperationTotalCounter.Inc()
|
||||
r.putOperationBytesCounter.Add(float64(fi.Size()))
|
||||
|
||||
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -935,7 +938,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
|
||||
|
||||
// If no retained snapshots exist, create a new snapshot.
|
||||
if len(snapshots) == 0 {
|
||||
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
|
||||
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
|
||||
return fmt.Errorf("cannot snapshot: %w", err)
|
||||
}
|
||||
@@ -958,7 +960,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
|
||||
|
||||
// Delete generations if it has no snapshots being retained.
|
||||
if snapshot == nil {
|
||||
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
|
||||
if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil {
|
||||
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
|
||||
}
|
||||
@@ -1001,16 +1002,13 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
|
||||
}
|
||||
|
||||
// Delete all files in batches.
|
||||
var n int
|
||||
for i := 0; i < len(objIDs); i += MaxKeys {
|
||||
j := i + MaxKeys
|
||||
if j > len(objIDs) {
|
||||
j = len(objIDs)
|
||||
}
|
||||
|
||||
for _, objID := range objIDs[i:j] {
|
||||
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
|
||||
}
|
||||
|
||||
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
|
||||
Bucket: aws.String(r.Bucket),
|
||||
Delete: &s3.Delete{
|
||||
@@ -1020,9 +1018,12 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
n += len(objIDs[i:j])
|
||||
r.deleteOperationTotalCounter.Inc()
|
||||
}
|
||||
|
||||
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user