Compare commits

..

22 Commits

Author SHA1 Message Date
Ben Johnson
37442babfb Revert validation mismatch temp file persistence
This commit reverts 4e469f8 which was used for debugging the validation
stall corruption issue. It can cause the disk to fill with temporary
files though so it is being reverted.
2021-02-09 06:44:42 -07:00
Ben Johnson
962a2a894b Fix tabwriter 2021-02-08 15:55:15 -07:00
Ben Johnson
0c61c9f7fe Merge pull request #39 from benbjohnson/replicate-s3-sync-interval
Reduce s3 sync interval when using replica URL
2021-02-08 14:14:20 -07:00
Ben Johnson
267b140fab Reduce s3 sync interval when using replica URL
This commit changes the default sync interval from 10s to 1s
when replicating using the inline replica URL. This approach is
used when users are first testing the software so a faster
replication interval makes it easier to see results.
2021-02-08 14:09:01 -07:00
Ben Johnson
1b194535e6 Merge pull request #38 from benbjohnson/trace-flag
Add trace file to replicate command
2021-02-06 07:34:02 -07:00
Ben Johnson
58a6c765fe Add trace file to replicate command
This commit removes the verbose flag (`-v`) and replaces it with
the trace flag (`-trace PATH`). This moves tracing to a separate
file instead of writing to STDOUT.
2021-02-06 07:31:19 -07:00
Ben Johnson
2604052a9f Merge pull request #37 from benbjohnson/fix-shadow-write
Fix shadow wal corruption on stalled validation
2021-02-06 07:31:05 -07:00
Ben Johnson
7f81890bae Fix shadow wal corruption on stalled validation
This commit fixes a timing bug that occurs in a specific scenario
where the shadow wal sync stalls because of an s3 validation and
the catch up write to the shadow wal is large enough to allow a
window between WAL reads and the final copy.

The file copy has been replaced by direct writes of the frame
buffer to the shadow to ensure that every validated byte is exactly
what is being written to the shadow wal. The one downside to this
change is that the frame buffer will grow with the transaction
size so it will use additional heap. This can be replaced by a
spill-to-disk implementation but this should work well in the
short term.
2021-02-06 07:28:15 -07:00
Ben Johnson
2ff073c735 Merge pull request #36 from benbjohnson/max-index
Enforce max WAL index
2021-02-02 15:16:42 -07:00
Ben Johnson
6fd11ccab5 Enforce max WAL index.
This commit sets a hard upper limit for the WAL index to (1<<31)-1.
The index is hex-encoded in file names as a 4-byte unsigned integer
so limit ensures all index values are below any upper limit and are
unaffected by any signed int limit.

A WAL file is typically at least 4MB so you would need to write
8 petabytes to reach this upper limit.
2021-02-02 15:11:50 -07:00
Ben Johnson
6c49fba592 Check checkpoint result during restore 2021-02-02 15:04:20 -07:00
Ben Johnson
922fa0798e Merge pull request #34 from benbjohnson/windows 2021-01-31 10:09:51 -07:00
Ben Johnson
976df182c0 Fix Windows build 2021-01-31 10:08:28 -07:00
Ben Johnson
0e28a650e6 Merge pull request #33 from benbjohnson/log-wal-checksum-mismatch
Log WAL frame checksum mismatch
2021-01-31 08:57:51 -07:00
Ben Johnson
f17768e830 Log WAL frame checksum mismatch
Currently, the WAL copy function can encounter a checksum mismatch in a
WAL frame and it will return an error. This can occur for partial writes
and is recovered from moments later. This commit changes the error to a
log write instead.
2021-01-31 08:52:12 -07:00
Ben Johnson
2c142d3a0c Merge pull request #32 from benbjohnson/persist-mismatch-validation-data
Persist primary/replica copies after validation mismatch
2021-01-31 08:50:15 -07:00
Ben Johnson
4e469f8b02 Persist primary/replica copies after validation mismatch
This commit changes `ValidateReplica()` to persist copies of the
primary & replica databases for inspection if a validation mismatch
occurs.
2021-01-31 08:47:06 -07:00
Ben Johnson
3f268b70f8 Merge pull request #31 from benbjohnson/adjust-logging
Reduce logging output
2021-01-31 08:14:57 -07:00
Ben Johnson
ad7bf7f974 Reduce logging output
Previously, there were excessive log messages for checkpoints and
retention. These have been removed or combined into a single log
message where appropriate.
2021-01-31 08:12:18 -07:00
Ben Johnson
778451f09f CONTRIBUTING 2021-01-28 13:31:28 -07:00
Ben Johnson
8e9a15933b README 2021-01-27 08:01:48 -07:00
Ben Johnson
da1d7c3183 README 2021-01-27 07:59:12 -07:00
13 changed files with 256 additions and 107 deletions

17
.github/CONTRIBUTING.md vendored Normal file
View File

@@ -0,0 +1,17 @@
## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
source but closed to contributions. This keeps the code base free of proprietary
or licensed code but it also helps me continue to maintain and build Litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and
I eventually archived the project. Writing databases & low-level replication
tools involves nuance and simple one line changes can have profound and
unexpected changes in correctness and performance. Small contributions
typically required hours of my time to properly test and validate them.
I am grateful for community involvement, bug reports, & feature requests. I do
not wish to come off as anything but welcoming, however, I've
made the decision to keep this project closed to contributions for my own
mental health and long term viability of the project.

View File

@@ -22,7 +22,7 @@ GitHub.
### Mac OS (Homebrew)
To install from homebrew, first add the Litestream tap and then install:
To install from homebrew, run the following command:
```sh
$ brew install benbjohnson/litestream/litestream
@@ -67,7 +67,7 @@ The `litestream` binary should be in your `$GOPATH/bin` folder.
Litestream provides a configuration file that can be used for production
deployments but you can also specify a single database and replica on the
command line for testing.
command line when trying it out.
First, you'll need to create an S3 bucket that we'll call `"mybkt"` in this
example. You'll also need to set your AWS credentials:
@@ -77,7 +77,7 @@ $ export AWS_ACCESS_KEY_ID=AKIAxxxxxxxxxxxxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/xxxxxxxxx
```
Next you can run the `litestream replicate` command with the path to the
Next, you can run the `litestream replicate` command with the path to the
database you want to backup and the URL of your replica destination:
```sh
@@ -89,7 +89,7 @@ to S3 every 10 seconds. From another terminal window, you can restore your
database from your S3 replica:
```
$ litestream restore -v -o /path/to/restored/db s3://mybkt/db
$ litestream restore -o /path/to/restored/db s3://mybkt/db
```
Voila! 🎉
@@ -240,7 +240,7 @@ $ litestream restore -timestamp 2020-01-01T00:00:00Z /path/to/db
```
Point-in-time restores only have the resolution of the timestamp of the WAL file
itself. By default, litestream will start a new WAL file every minute so
itself. By default, Litestream will start a new WAL file every minute so
point-in-time restores are only accurate to the minute.
@@ -276,9 +276,9 @@ another process is allowed to checkpoint the WAL.
## Open-source, not open-contribution
[Similar to SQLite](https://www.sqlite.org/copyright.html), litestream is open
[Similar to SQLite](https://www.sqlite.org/copyright.html), Litestream is open
source but closed to contributions. This keeps the code base free of proprietary
or licensed code but it also helps me continue to maintain and build litestream.
or licensed code but it also helps me continue to maintain and build Litestream.
As the author of [BoltDB](https://github.com/boltdb/bolt), I found that
accepting and maintaining third party patches contributed to my burn out and

View File

@@ -35,7 +35,9 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
}
// List all databases.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "path\treplicas")
for _, dbConfig := range config.DBs {
db, err := newDBFromConfig(&config, dbConfig)
@@ -53,7 +55,6 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
strings.Join(replicaNames, ","),
)
}
w.Flush()
return nil
}

View File

@@ -77,7 +77,9 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
}
// List each generation.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "name\tgeneration\tlag\tstart\tend")
for _, r := range replicas {
generations, err := r.Generations(ctx)
@@ -101,10 +103,8 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
stats.CreatedAt.Format(time.RFC3339),
stats.UpdatedAt.Format(time.RFC3339),
)
w.Flush()
}
}
w.Flush()
return nil
}

View File

@@ -11,6 +11,7 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
@@ -29,7 +30,7 @@ type ReplicateCommand struct {
// Run loads all databases specified in the configuration.
func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
verbose := fs.Bool("v", false, "verbose logging")
tracePath := fs.String("trace", "", "trace path")
registerConfigFlag(fs, &c.ConfigPath)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
@@ -43,7 +44,10 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
} else if fs.NArg() > 1 {
dbConfig := &DBConfig{Path: fs.Arg(0)}
for _, u := range fs.Args()[1:] {
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{URL: u})
dbConfig.Replicas = append(dbConfig.Replicas, &ReplicaConfig{
URL: u,
SyncInterval: 1 * time.Second,
})
}
config.DBs = []*DBConfig{dbConfig}
} else if c.ConfigPath != "" {
@@ -56,8 +60,13 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
}
// Enable trace logging.
if *verbose {
litestream.Tracef = log.Printf
if *tracePath != "" {
f, err := os.Create(*tracePath)
if err != nil {
return err
}
defer f.Close()
litestream.Tracef = log.New(f, "", log.LstdFlags|log.LUTC|log.Lshortfile).Printf
}
// Setup signal handler.
@@ -159,8 +168,8 @@ Arguments:
Specifies the configuration file.
Defaults to %s
-v
Enable verbose logging output.
-trace PATH
Write verbose trace logging to PATH.
`[1:], DefaultConfigPath())
}

View File

@@ -19,6 +19,8 @@ type RestoreCommand struct{}
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
var configPath string
opt := litestream.NewRestoreOptions()
opt.Verbose = true
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
registerConfigFlag(fs, &configPath)
fs.StringVar(&opt.OutputPath, "o", "", "output path")

View File

@@ -75,7 +75,9 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
}
// List all snapshots.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "replica\tgeneration\tindex\tsize\tcreated")
for _, info := range infos {
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%s\n",
@@ -86,7 +88,6 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
info.CreatedAt.Format(time.RFC3339),
)
}
w.Flush()
return nil
}

View File

@@ -76,7 +76,9 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
}
// List all WAL files.
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
defer w.Flush()
fmt.Fprintln(w, "replica\tgeneration\tindex\toffset\tsize\tcreated")
for _, info := range infos {
if *generation != "" && info.Generation != *generation {
@@ -92,7 +94,6 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
info.CreatedAt.Format(time.RFC3339),
)
}
w.Flush()
return nil
}

130
db.go
View File

@@ -33,6 +33,10 @@ const (
DefaultMaxCheckpointPageN = 10000
)
// MaxIndex is the maximum possible WAL index.
// If this index is reached then a new generation will be started.
const MaxIndex = 0x7FFFFFFF
// BusyTimeout is the timeout to wait for EBUSY from SQLite.
const BusyTimeout = 1 * time.Second
@@ -421,7 +425,7 @@ func (db *DB) init() (err error) {
// If we have an existing shadow WAL, ensure the headers match.
if err := db.verifyHeadersMatch(); err != nil {
log.Printf("%s: cannot determine last wal position, clearing generation (%s)", db.path, err)
log.Printf("%s: init: cannot determine last wal position, clearing generation (%s)", db.path, err)
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove generation name: %w", err)
}
@@ -736,7 +740,7 @@ func (db *DB) Sync() (err error) {
if info.generation, err = db.createGeneration(); err != nil {
return fmt.Errorf("create generation: %w", err)
}
log.Printf("%s: new generation %q, %s", db.path, info.generation, info.reason)
log.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
// Clear shadow wal info.
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
@@ -844,10 +848,14 @@ func (db *DB) verify() (info syncInfo, err error) {
db.walSizeGauge.Set(float64(fi.Size()))
// Open shadow WAL to copy append to.
info.shadowWALPath, err = db.CurrentShadowWALPath(info.generation)
index, _, err := db.CurrentShadowWALIndex(info.generation)
if err != nil {
return info, fmt.Errorf("cannot determine shadow WAL: %w", err)
return info, fmt.Errorf("cannot determine shadow WAL index: %w", err)
} else if index >= MaxIndex {
info.reason = "max index exceeded"
return info, nil
}
info.shadowWALPath = db.ShadowWALPath(generation, index)
// Determine shadow WAL current size.
fi, err = os.Stat(info.shadowWALPath)
@@ -1017,65 +1025,65 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
return 0, fmt.Errorf("last checksum: %w", err)
}
// Seek to correct position on both files.
// Seek to correct position on real wal.
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("wal seek: %w", err)
return 0, fmt.Errorf("real wal seek: %w", err)
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("shadow wal seek: %w", err)
}
// Read through WAL from last position to find the page of the last
// committed transaction.
tmpSz := origSize
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
var buf bytes.Buffer
offset := origSize
lastCommitSize := origSize
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
for {
Tracef("%s: copy-shadow: %s @ %d", db.path, filename, tmpSz)
// Read next page from WAL file.
if _, err := io.ReadFull(r, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
Tracef("%s: copy-shadow: break %s", db.path, err)
if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err)
break // end of file or partial page
} else if err != nil {
return 0, fmt.Errorf("read wal: %w", err)
}
// Read frame salt & compare to header salt. Stop reading on mismatch.
salt0 := binary.BigEndian.Uint32(buf[8:])
salt1 := binary.BigEndian.Uint32(buf[12:])
salt0 := binary.BigEndian.Uint32(frame[8:])
salt1 := binary.BigEndian.Uint32(frame[12:])
if salt0 != hsalt0 || salt1 != hsalt1 {
Tracef("%s: copy-shadow: break: salt mismatch", db.path)
break
}
// Verify checksum of page is valid.
fchksum0 := binary.BigEndian.Uint32(buf[16:])
fchksum1 := binary.BigEndian.Uint32(buf[20:])
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[:8]) // frame header
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, buf[24:]) // frame data
fchksum0 := binary.BigEndian.Uint32(frame[16:])
fchksum1 := binary.BigEndian.Uint32(frame[20:])
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
if chksum0 != fchksum0 || chksum1 != fchksum1 {
return 0, fmt.Errorf("checksum mismatch: offset=%d (%x,%x) != (%x,%x)", tmpSz, chksum0, chksum1, fchksum0, fchksum1)
log.Printf("copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", offset, chksum0, chksum1, fchksum0, fchksum1)
break
}
// Add page to the new size of the shadow WAL.
tmpSz += int64(len(buf))
buf.Write(frame)
// Mark commit record.
newDBSize := binary.BigEndian.Uint32(buf[4:])
Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
offset += int64(len(frame))
// Flush to shadow WAL if commit record.
newDBSize := binary.BigEndian.Uint32(frame[4:])
if newDBSize != 0 {
lastCommitSize = tmpSz
if _, err := buf.WriteTo(w); err != nil {
return 0, fmt.Errorf("write shadow wal: %w", err)
}
buf.Reset()
lastCommitSize = offset
}
}
// Seek to correct position on both files.
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("wal seek: %w", err)
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
return 0, fmt.Errorf("shadow wal seek: %w", err)
}
// Copy bytes, sync & close.
if _, err := io.CopyN(w, r, lastCommitSize-origSize); err != nil {
return 0, err
} else if err := w.Sync(); err != nil {
// Sync & close.
if err := w.Sync(); err != nil {
return 0, err
} else if err := w.Close(); err != nil {
return 0, err
@@ -1098,6 +1106,8 @@ func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error) {
return nil, err
} else if r.N() > 0 {
return r, nil
} else if err := r.Close(); err != nil { // no data, close, try next
return nil, err
}
// Otherwise attempt to read the start of the next WAL file.
@@ -1171,6 +1181,9 @@ type ShadowWALReader struct {
pos Pos
}
// Name returns the filename of the underlying file.
func (r *ShadowWALReader) Name() string { return r.f.Name() }
// Close closes the underlying WAL file handle.
func (r *ShadowWALReader) Close() error { return r.f.Close() }
@@ -1264,7 +1277,7 @@ func (db *DB) checkpoint(mode string) (err error) {
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
return err
}
log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
// Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil {
@@ -1288,6 +1301,11 @@ func (db *DB) checkpointAndInit(generation, mode string) error {
return err
}
// Copy shadow WAL before checkpoint to copy as much as possible.
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
}
// Execute checkpoint and immediately issue a write to the WAL to ensure
// a new page is written.
if err := db.checkpoint(mode); err != nil {
@@ -1414,7 +1432,10 @@ func RestoreReplica(ctx context.Context, r Replica, opt RestoreOptions) error {
return fmt.Errorf("cannot restore wal: %w", err)
}
}
logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
if opt.Verbose {
logger.Printf("%s: restored wal %s/%08x", logPrefix, opt.Generation, index)
}
}
// Copy file to final location.
@@ -1587,13 +1608,14 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
}
defer d.Close()
if _, err := d.Exec(`PRAGMA wal_checkpoint(TRUNCATE);`); err != nil {
return err
} else if err := d.Close(); err != nil {
var row [3]int
if err := d.QueryRow(`PRAGMA wal_checkpoint(TRUNCATE);`).Scan(&row[0], &row[1], &row[2]); err != nil {
return err
} else if row[0] != 0 {
return fmt.Errorf("truncation checkpoint failed during restore (%d,%d,%d)", row[0], row[1], row[2])
}
return nil
return d.Close()
}
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
@@ -1601,6 +1623,8 @@ func restoreWAL(ctx context.Context, r Replica, generation string, index int, db
// This function obtains a read lock so it prevents syncs from occurring until
// the operation is complete. The database will still be usable but it will be
// unable to checkpoint during this time.
//
// If dst is set, the database file is copied to that location before checksum.
func (db *DB) CRC64() (uint64, Pos, error) {
db.mu.Lock()
defer db.mu.Unlock()
@@ -1664,8 +1688,9 @@ type RestoreOptions struct {
// Only equivalent log output for a regular restore.
DryRun bool
// Logger used to print status to.
Logger *log.Logger
// Logging settings.
Logger *log.Logger
Verbose bool
}
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
@@ -1766,3 +1791,24 @@ func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
return nil, fmt.Errorf("invalid wal header magic: %x", magic)
}
}
func copyFile(dst, src string) error {
r, err := os.Open(src)
if err != nil {
return err
}
defer r.Close()
w, err := os.Create(dst)
if err != nil {
return err
}
defer w.Close()
if _, err := io.Copy(w, r); err != nil {
return err
} else if err := w.Sync(); err != nil {
return err
}
return nil
}

View File

@@ -95,9 +95,9 @@ type Pos struct {
// String returns a string representation.
func (p Pos) String() string {
if p.IsZero() {
return "<>"
return ""
}
return fmt.Sprintf("<%s,%08x,%d>", p.Generation, p.Index, p.Offset)
return fmt.Sprintf("%s/%08x:%d", p.Generation, p.Index, p.Offset)
}
// IsZero returns true if p is the zero value.

View File

@@ -4,7 +4,6 @@ package litestream
import (
"os"
"syscall"
)
// fileinfo returns syscall fields from a FileInfo object.
@@ -15,7 +14,7 @@ func fileinfo(fi os.FileInfo) (uid, gid int) {
// fixRootDirectory is copied from the standard library for use with mkdirAll()
func fixRootDirectory(p string) string {
if len(p) == len(`\\?\c:`) {
if IsPathSeparator(p[0]) && IsPathSeparator(p[1]) && p[2] == '?' && IsPathSeparator(p[3]) && p[5] == ':' {
if os.IsPathSeparator(p[0]) && os.IsPathSeparator(p[1]) && p[2] == '?' && os.IsPathSeparator(p[3]) && p[5] == ':' {
return p + `\`
}
}

View File

@@ -2,6 +2,7 @@ package litestream
import (
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
@@ -417,7 +418,7 @@ func (r *FileReplica) Stop() {
func (r *FileReplica) monitor(ctx context.Context) {
// Clear old temporary files that my have been left from a crash.
if err := removeTmpFiles(r.dst); err != nil {
log.Printf("%s(%s): cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
log.Printf("%s(%s): monitor: cannot remove tmp files: %s", r.db.Path(), r.Name(), err)
}
// Continuously check for new data to replicate.
@@ -437,7 +438,7 @@ func (r *FileReplica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
continue
}
}
@@ -454,7 +455,7 @@ func (r *FileReplica) retainer(ctx context.Context) {
return
case <-ticker.C:
if err := r.EnforceRetention(ctx); err != nil {
log.Printf("%s(%s): retain error: %s", r.db.Path(), r.Name(), err)
log.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
continue
}
}
@@ -548,11 +549,16 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
return nil
}
startTime := time.Now()
if err := mkdirAll(filepath.Dir(snapshotPath), r.db.dirmode, r.db.diruid, r.db.dirgid); err != nil {
return err
} else if err := compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid); err != nil {
return err
}
return compressFile(r.db.Path(), snapshotPath, r.db.uid, r.db.gid)
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
return nil
}
// snapshotN returns the number of snapshots for a generation.
@@ -593,6 +599,8 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
}
generation := dpos.Generation
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
// Create snapshot if no snapshots exist for generation.
if n, err := r.snapshotN(generation); err != nil {
return err
@@ -612,6 +620,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
return fmt.Errorf("cannot determine replica position: %s", err)
}
Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos)
r.mu.Lock()
r.pos = pos
r.mu.Unlock()
@@ -664,10 +673,47 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) {
return err
}
n, err := io.Copy(w, rd)
r.walBytesCounter.Add(float64(n))
if err != nil {
return err
// Copy header if at offset zero.
var psalt uint64 // previous salt value
if pos := rd.Pos(); pos.Offset == 0 {
buf := make([]byte, WALHeaderSize)
if _, err := io.ReadFull(rd, buf); err != nil {
return err
}
psalt = binary.BigEndian.Uint64(buf[16:24])
n, err := w.Write(buf)
if err != nil {
return err
}
r.walBytesCounter.Add(float64(n))
}
// Copy frames.
for {
pos := rd.Pos()
assert(pos.Offset == frameAlign(pos.Offset, r.db.pageSize), "shadow wal reader not frame aligned")
buf := make([]byte, WALFrameHeaderSize+r.db.pageSize)
if _, err := io.ReadFull(rd, buf); err == io.EOF {
break
} else if err != nil {
return err
}
// Verify salt matches the previous frame/header read.
salt := binary.BigEndian.Uint64(buf[8:16])
if psalt != 0 && psalt != salt {
return fmt.Errorf("replica salt mismatch: %s", filepath.Base(filename))
}
psalt = salt
n, err := w.Write(buf)
if err != nil {
return err
}
r.walBytesCounter.Add(float64(n))
}
if err := w.Sync(); err != nil {
@@ -796,7 +842,6 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
// If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err)
}
@@ -814,7 +859,7 @@ func (r *FileReplica) EnforceRetention(ctx context.Context) (err error) {
// Delete generations if it has no snapshots being retained.
if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
log.Printf("%s(%s): retainer: deleting generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := os.RemoveAll(r.GenerationDir(generation)); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
}
@@ -843,6 +888,7 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
return err
}
var n int
for _, fi := range fis {
idx, _, err := ParseSnapshotPath(fi.Name())
if err != nil {
@@ -851,10 +897,13 @@ func (r *FileReplica) deleteGenerationSnapshotsBefore(ctx context.Context, gener
continue
}
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err
}
n++
}
if n > 0 {
log.Printf("%s(%s): retainer: deleting snapshots before %s/%08x; n=%d", r.db.Path(), r.Name(), generation, index, n)
}
return nil
@@ -871,6 +920,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
return err
}
var n int
for _, fi := range fis {
idx, _, _, err := ParseWALPath(fi.Name())
if err != nil {
@@ -879,10 +929,13 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
continue
}
log.Printf("%s(%s): generation %q wal no longer retained, deleting %s", r.db.Path(), r.Name(), generation, fi.Name())
if err := os.Remove(filepath.Join(dir, fi.Name())); err != nil {
return err
}
n++
}
if n > 0 {
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
}
return nil
@@ -991,20 +1044,6 @@ func compressFile(src, dst string, uid, gid int) error {
func ValidateReplica(ctx context.Context, r Replica) error {
db := r.DB()
// Compute checksum of primary database under lock. This prevents a
// sync from occurring and the database will not be written.
chksum0, pos, err := db.CRC64()
if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err)
}
log.Printf("%s(%s): primary checksum computed: %016x @ %s", db.Path(), r.Name(), chksum0, pos)
// Wait until replica catches up to position.
log.Printf("%s(%s): waiting for replica", db.Path(), r.Name())
if err := waitForReplica(ctx, r, pos); err != nil {
return fmt.Errorf("cannot wait for replica: %w", err)
}
// Restore replica to a temporary directory.
tmpdir, err := ioutil.TempDir("", "*-litestream")
if err != nil {
@@ -1012,7 +1051,20 @@ func ValidateReplica(ctx context.Context, r Replica) error {
}
defer os.RemoveAll(tmpdir)
restorePath := filepath.Join(tmpdir, "db")
// Compute checksum of primary database under lock. This prevents a
// sync from occurring and the database will not be written.
primaryPath := filepath.Join(tmpdir, "primary")
chksum0, pos, err := db.CRC64()
if err != nil {
return fmt.Errorf("cannot compute checksum: %w", err)
}
// Wait until replica catches up to position.
if err := waitForReplica(ctx, r, pos); err != nil {
return fmt.Errorf("cannot wait for replica: %w", err)
}
restorePath := filepath.Join(tmpdir, "replica")
if err := RestoreReplica(ctx, r, RestoreOptions{
OutputPath: restorePath,
ReplicaName: r.Name(),
@@ -1029,17 +1081,33 @@ func ValidateReplica(ctx context.Context, r Replica) error {
return err
}
log.Printf("%s(%s): restore complete, replica checksum=%016x", db.Path(), r.Name(), chksum1)
status := "ok"
mismatch := chksum0 != chksum1
if mismatch {
status = "mismatch"
}
log.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
// Validate checksums match.
if chksum0 != chksum1 {
if mismatch {
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "error").Inc()
// Compress mismatched databases and report temporary path for investigation.
if err := compressFile(primaryPath, primaryPath+".lz4", db.uid, db.gid); err != nil {
return fmt.Errorf("cannot compress primary db: %w", err)
} else if err := compressFile(restorePath, restorePath+".lz4", db.uid, db.gid); err != nil {
return fmt.Errorf("cannot compress replica db: %w", err)
}
log.Printf("%s(%s): validator: mismatch files @ %s", db.Path(), r.Name(), tmpdir)
return ErrChecksumMismatch
}
internal.ReplicaValidationTotalCounterVec.WithLabelValues(db.Path(), r.Name(), "ok").Inc()
log.Printf("%s(%s): replica ok", db.Path(), r.Name())
if err := os.RemoveAll(tmpdir); err != nil {
return fmt.Errorf("cannot remove temporary validation directory: %w", err)
}
return nil
}
@@ -1050,6 +1118,9 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTicker(10 * time.Second)
defer ticker.Stop()
once := make(chan struct{}, 1)
once <- struct{}{}
@@ -1057,6 +1128,8 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return fmt.Errorf("replica wait exceeded timeout")
case <-ticker.C:
case <-once: // immediate on first check
}
@@ -1064,7 +1137,7 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
// Obtain current position of replica, check if past target position.
curr, err := r.CalcPos(ctx, pos.Generation)
if err != nil {
log.Printf("%s(%s): cannot obtain replica position: %s", db.Path(), r.Name(), err)
log.Printf("%s(%s): validator: cannot obtain replica position: %s", db.Path(), r.Name(), err)
continue
}
@@ -1083,7 +1156,6 @@ func waitForReplica(ctx context.Context, r Replica, pos Pos) error {
// If not ready, restart loop.
if !ready {
log.Printf("%s(%s): replica at %s, waiting for %s", db.Path(), r.Name(), curr, pos)
continue
}

View File

@@ -467,7 +467,7 @@ func (r *Replica) monitor(ctx context.Context) {
// Synchronize the shadow wal into the replication directory.
if err := r.Sync(ctx); err != nil {
log.Printf("%s(%s): sync error: %s", r.db.Path(), r.Name(), err)
log.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
continue
}
}
@@ -603,6 +603,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
}()
snapshotPath := r.SnapshotPath(generation, index)
startTime := time.Now()
if _, err := r.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(r.Bucket),
@@ -615,6 +616,8 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
r.putOperationTotalCounter.Inc()
r.putOperationBytesCounter.Add(float64(fi.Size()))
log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime))
return nil
}
@@ -935,7 +938,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
// If no retained snapshots exist, create a new snapshot.
if len(snapshots) == 0 {
log.Printf("%s(%s): snapshots exceeds retention, creating new snapshot", r.db.Path(), r.Name())
if err := r.snapshot(ctx, pos.Generation, pos.Index); err != nil {
return fmt.Errorf("cannot snapshot: %w", err)
}
@@ -958,7 +960,6 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
// Delete generations if it has no snapshots being retained.
if snapshot == nil {
log.Printf("%s(%s): generation %q has no retained snapshots, deleting", r.db.Path(), r.Name(), generation)
if err := r.deleteGenerationBefore(ctx, generation, -1); err != nil {
return fmt.Errorf("cannot delete generation %q dir: %w", generation, err)
}
@@ -1001,16 +1002,13 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
}
// Delete all files in batches.
var n int
for i := 0; i < len(objIDs); i += MaxKeys {
j := i + MaxKeys
if j > len(objIDs) {
j = len(objIDs)
}
for _, objID := range objIDs[i:j] {
log.Printf("%s(%s): retention exceeded, deleting from generation %q: %s", r.db.Path(), r.Name(), generation, path.Base(*objID.Key))
}
if _, err := r.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(r.Bucket),
Delete: &s3.Delete{
@@ -1020,9 +1018,12 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
}); err != nil {
return err
}
n += len(objIDs[i:j])
r.deleteOperationTotalCounter.Inc()
}
log.Printf("%s(%s): retainer: deleting wal files before %s/%08x n=%d", r.db.Path(), r.Name(), generation, index, n)
return nil
}