cleanup
This commit is contained in:
@@ -10,8 +10,10 @@ import (
|
|||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DatabasesCommand is a command for listing managed databases.
|
||||||
type DatabasesCommand struct{}
|
type DatabasesCommand struct{}
|
||||||
|
|
||||||
|
// Run executes the command.
|
||||||
func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
var configPath string
|
var configPath string
|
||||||
fs := flag.NewFlagSet("litestream-databases", flag.ContinueOnError)
|
fs := flag.NewFlagSet("litestream-databases", flag.ContinueOnError)
|
||||||
@@ -56,6 +58,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
func (c *DatabasesCommand) Usage() {
|
func (c *DatabasesCommand) Usage() {
|
||||||
fmt.Printf(`
|
fmt.Printf(`
|
||||||
The databases command lists all databases in the configuration file.
|
The databases command lists all databases in the configuration file.
|
||||||
|
|||||||
@@ -12,8 +12,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// GenerationsCommand represents a command to list all generations for a database.
|
||||||
type GenerationsCommand struct{}
|
type GenerationsCommand struct{}
|
||||||
|
|
||||||
|
// Run executes the command.
|
||||||
func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) {
|
func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
var configPath string
|
var configPath string
|
||||||
fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError)
|
fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError)
|
||||||
@@ -96,6 +98,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help message to STDOUT.
|
||||||
func (c *GenerationsCommand) Usage() {
|
func (c *GenerationsCommand) Usage() {
|
||||||
fmt.Printf(`
|
fmt.Printf(`
|
||||||
The generations command lists all generations for a database. It also lists
|
The generations command lists all generations for a database. It also lists
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
"github.com/benbjohnson/litestream/s3"
|
"github.com/benbjohnson/litestream/s3"
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -36,12 +37,15 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Main represents the main program execution.
|
||||||
type Main struct{}
|
type Main struct{}
|
||||||
|
|
||||||
|
// NewMain returns a new instance of Main.
|
||||||
func NewMain() *Main {
|
func NewMain() *Main {
|
||||||
return &Main{}
|
return &Main{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run executes the program.
|
||||||
func (m *Main) Run(ctx context.Context, args []string) (err error) {
|
func (m *Main) Run(ctx context.Context, args []string) (err error) {
|
||||||
var cmd string
|
var cmd string
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
@@ -72,6 +76,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
func (m *Main) Usage() {
|
func (m *Main) Usage() {
|
||||||
fmt.Println(`
|
fmt.Println(`
|
||||||
litestream is a tool for replicating SQLite databases.
|
litestream is a tool for replicating SQLite databases.
|
||||||
@@ -112,6 +117,7 @@ type Config struct {
|
|||||||
Bucket string `yaml:"bucket"`
|
Bucket string `yaml:"bucket"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Normalize expands paths and parses URL-specified replicas.
|
||||||
func (c *Config) Normalize() error {
|
func (c *Config) Normalize() error {
|
||||||
for i := range c.DBs {
|
for i := range c.DBs {
|
||||||
if err := c.DBs[i].Normalize(); err != nil {
|
if err := c.DBs[i].Normalize(); err != nil {
|
||||||
@@ -128,6 +134,7 @@ func DefaultConfig() Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DBConfig returns database configuration by path.
|
||||||
func (c *Config) DBConfig(path string) *DBConfig {
|
func (c *Config) DBConfig(path string) *DBConfig {
|
||||||
for _, dbConfig := range c.DBs {
|
for _, dbConfig := range c.DBs {
|
||||||
if dbConfig.Path == path {
|
if dbConfig.Path == path {
|
||||||
@@ -167,11 +174,13 @@ func ReadConfigFile(filename string) (Config, error) {
|
|||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DBConfig represents the configuration for a single database.
|
||||||
type DBConfig struct {
|
type DBConfig struct {
|
||||||
Path string `yaml:"path"`
|
Path string `yaml:"path"`
|
||||||
Replicas []*ReplicaConfig `yaml:"replicas"`
|
Replicas []*ReplicaConfig `yaml:"replicas"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Normalize expands paths and parses URL-specified replicas.
|
||||||
func (c *DBConfig) Normalize() error {
|
func (c *DBConfig) Normalize() error {
|
||||||
for i := range c.Replicas {
|
for i := range c.Replicas {
|
||||||
if err := c.Replicas[i].Normalize(); err != nil {
|
if err := c.Replicas[i].Normalize(); err != nil {
|
||||||
@@ -181,6 +190,7 @@ func (c *DBConfig) Normalize() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReplicaConfig represents the configuration for a single replica in a database.
|
||||||
type ReplicaConfig struct {
|
type ReplicaConfig struct {
|
||||||
Type string `yaml:"type"` // "file", "s3"
|
Type string `yaml:"type"` // "file", "s3"
|
||||||
Name string `yaml:"name"` // name of replica, optional.
|
Name string `yaml:"name"` // name of replica, optional.
|
||||||
@@ -197,6 +207,7 @@ type ReplicaConfig struct {
|
|||||||
Bucket string `yaml:"bucket"`
|
Bucket string `yaml:"bucket"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Normalize expands paths and parses URL-specified replicas.
|
||||||
func (c *ReplicaConfig) Normalize() error {
|
func (c *ReplicaConfig) Normalize() error {
|
||||||
// Expand path filename, if necessary.
|
// Expand path filename, if necessary.
|
||||||
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(c.Path, prefix) {
|
if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(c.Path, prefix) {
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ReplicateCommand represents a command that continuously replicates SQLite databases.
|
||||||
type ReplicateCommand struct {
|
type ReplicateCommand struct {
|
||||||
ConfigPath string
|
ConfigPath string
|
||||||
Config Config
|
Config Config
|
||||||
@@ -96,7 +97,9 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port)
|
fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port)
|
||||||
go func() {
|
go func() {
|
||||||
http.Handle("/metrics", promhttp.Handler())
|
http.Handle("/metrics", promhttp.Handler())
|
||||||
http.ListenAndServe(config.Addr, nil)
|
if err := http.ListenAndServe(config.Addr, nil); err != nil {
|
||||||
|
log.Printf("cannot start metrics server: %s", err)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,6 +129,7 @@ func (c *ReplicateCommand) Close() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
func (c *ReplicateCommand) Usage() {
|
func (c *ReplicateCommand) Usage() {
|
||||||
fmt.Printf(`
|
fmt.Printf(`
|
||||||
The replicate command starts a server to monitor & replicate databases
|
The replicate command starts a server to monitor & replicate databases
|
||||||
|
|||||||
@@ -13,9 +13,10 @@ import (
|
|||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RestoreCommand struct {
|
// RestoreCommand represents a command to restore a database from a backup.
|
||||||
}
|
type RestoreCommand struct{}
|
||||||
|
|
||||||
|
// Run executes the command.
|
||||||
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
var configPath string
|
var configPath string
|
||||||
opt := litestream.NewRestoreOptions()
|
opt := litestream.NewRestoreOptions()
|
||||||
@@ -82,6 +83,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return db.Restore(ctx, opt)
|
return db.Restore(ctx, opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
func (c *RestoreCommand) Usage() {
|
func (c *RestoreCommand) Usage() {
|
||||||
fmt.Printf(`
|
fmt.Printf(`
|
||||||
The restore command recovers a database from a previous snapshot and WAL.
|
The restore command recovers a database from a previous snapshot and WAL.
|
||||||
|
|||||||
@@ -13,8 +13,10 @@ import (
|
|||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SnapshotsCommand represents a command to list snapshots for a command.
|
||||||
type SnapshotsCommand struct{}
|
type SnapshotsCommand struct{}
|
||||||
|
|
||||||
|
// Run executes the command.
|
||||||
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
var configPath string
|
var configPath string
|
||||||
fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError)
|
fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError)
|
||||||
@@ -85,6 +87,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
func (c *SnapshotsCommand) Usage() {
|
func (c *SnapshotsCommand) Usage() {
|
||||||
fmt.Printf(`
|
fmt.Printf(`
|
||||||
The snapshots command lists all snapshots available for a database.
|
The snapshots command lists all snapshots available for a database.
|
||||||
|
|||||||
@@ -6,8 +6,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// VersionCommand represents a command to print the current version.
|
||||||
type VersionCommand struct{}
|
type VersionCommand struct{}
|
||||||
|
|
||||||
|
// Run executes the command.
|
||||||
func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
|
func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
fs := flag.NewFlagSet("litestream-version", flag.ContinueOnError)
|
fs := flag.NewFlagSet("litestream-version", flag.ContinueOnError)
|
||||||
fs.Usage = c.Usage
|
fs.Usage = c.Usage
|
||||||
@@ -20,6 +22,7 @@ func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
func (c *VersionCommand) Usage() {
|
func (c *VersionCommand) Usage() {
|
||||||
fmt.Println(`
|
fmt.Println(`
|
||||||
Prints the version.
|
Prints the version.
|
||||||
|
|||||||
@@ -13,8 +13,10 @@ import (
|
|||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// WALCommand represents a command to list WAL files for a database.
|
||||||
type WALCommand struct{}
|
type WALCommand struct{}
|
||||||
|
|
||||||
|
// Run executes the command.
|
||||||
func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
var configPath string
|
var configPath string
|
||||||
fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError)
|
fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError)
|
||||||
@@ -91,6 +93,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
func (c *WALCommand) Usage() {
|
func (c *WALCommand) Usage() {
|
||||||
fmt.Printf(`
|
fmt.Printf(`
|
||||||
The wal command lists all wal files available for a database.
|
The wal command lists all wal files available for a database.
|
||||||
|
|||||||
43
db.go
43
db.go
@@ -253,6 +253,7 @@ func (db *DB) PageSize() int {
|
|||||||
return db.pageSize
|
return db.pageSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open initializes the background monitoring goroutine.
|
||||||
func (db *DB) Open() (err error) {
|
func (db *DB) Open() (err error) {
|
||||||
// Validate that all replica names are unique.
|
// Validate that all replica names are unique.
|
||||||
m := make(map[string]struct{})
|
m := make(map[string]struct{})
|
||||||
@@ -545,7 +546,7 @@ func (db *DB) acquireReadLock() error {
|
|||||||
|
|
||||||
// Execute read query to obtain read lock.
|
// Execute read query to obtain read lock.
|
||||||
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
||||||
tx.Rollback()
|
_ = tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -782,15 +783,15 @@ func (db *DB) verify() (info syncInfo, err error) {
|
|||||||
info.generation = generation
|
info.generation = generation
|
||||||
|
|
||||||
// Determine total bytes of real DB for metrics.
|
// Determine total bytes of real DB for metrics.
|
||||||
if fi, err := os.Stat(db.Path()); err != nil {
|
fi, err := os.Stat(db.Path())
|
||||||
|
if err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
} else {
|
|
||||||
info.dbModTime = fi.ModTime()
|
|
||||||
db.dbSizeGauge.Set(float64(fi.Size()))
|
|
||||||
}
|
}
|
||||||
|
info.dbModTime = fi.ModTime()
|
||||||
|
db.dbSizeGauge.Set(float64(fi.Size()))
|
||||||
|
|
||||||
// Determine total bytes of real WAL.
|
// Determine total bytes of real WAL.
|
||||||
fi, err := os.Stat(db.WALPath())
|
fi, err = os.Stat(db.WALPath())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
@@ -1140,8 +1141,11 @@ func (r *ShadowWALReader) Read(p []byte) (n int, err error) {
|
|||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
const WALHeaderChecksumOffset = 24
|
// SQLite WAL constants
|
||||||
const WALFrameHeaderChecksumOffset = 16
|
const (
|
||||||
|
WALHeaderChecksumOffset = 24
|
||||||
|
WALFrameHeaderChecksumOffset = 16
|
||||||
|
)
|
||||||
|
|
||||||
func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
|
func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
|
||||||
// Determine the byte offset of the checksum for the header (if no pages
|
// Determine the byte offset of the checksum for the header (if no pages
|
||||||
@@ -1192,7 +1196,7 @@ func (db *DB) checkpoint(mode string) (err error) {
|
|||||||
if err := db.releaseReadLock(); err != nil {
|
if err := db.releaseReadLock(); err != nil {
|
||||||
return fmt.Errorf("release read lock: %w", err)
|
return fmt.Errorf("release read lock: %w", err)
|
||||||
}
|
}
|
||||||
defer db.acquireReadLock()
|
defer func() { _ = db.acquireReadLock() }()
|
||||||
|
|
||||||
// A non-forced checkpoint is issued as "PASSIVE". This will only checkpoint
|
// A non-forced checkpoint is issued as "PASSIVE". This will only checkpoint
|
||||||
// if there are not pending transactions. A forced checkpoint ("RESTART")
|
// if there are not pending transactions. A forced checkpoint ("RESTART")
|
||||||
@@ -1388,24 +1392,6 @@ func checksumFile(filename string) (uint64, error) {
|
|||||||
return h.Sum64(), nil
|
return h.Sum64(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checksumFileAt(filename string, offset int64) (uint64, error) {
|
|
||||||
f, err := os.Open(filename)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
h := crc64.New(crc64.MakeTable(crc64.ISO))
|
|
||||||
if _, err := io.Copy(h, f); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return h.Sum64(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) {
|
func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) {
|
||||||
var target struct {
|
var target struct {
|
||||||
replica Replica
|
replica Replica
|
||||||
@@ -1532,7 +1518,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde
|
|||||||
|
|
||||||
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
|
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
|
||||||
//
|
//
|
||||||
// This function obtains a read lock so it prevents syncs from occuring until
|
// This function obtains a read lock so it prevents syncs from occurring until
|
||||||
// the operation is complete. The database will still be usable but it will be
|
// the operation is complete. The database will still be usable but it will be
|
||||||
// unable to checkpoint during this time.
|
// unable to checkpoint during this time.
|
||||||
func (db *DB) CRC64() (uint64, Pos, error) {
|
func (db *DB) CRC64() (uint64, Pos, error) {
|
||||||
@@ -1602,6 +1588,7 @@ type RestoreOptions struct {
|
|||||||
Logger *log.Logger
|
Logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
|
||||||
func NewRestoreOptions() RestoreOptions {
|
func NewRestoreOptions() RestoreOptions {
|
||||||
return RestoreOptions{
|
return RestoreOptions{
|
||||||
Index: math.MaxInt64,
|
Index: math.MaxInt64,
|
||||||
|
|||||||
@@ -371,7 +371,7 @@ func TestDB_Sync(t *testing.T) {
|
|||||||
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
|
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
|
||||||
if buf, err := ioutil.ReadFile(shadowWALPath); err != nil {
|
if buf, err := ioutil.ReadFile(shadowWALPath); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
|
} else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,88 +0,0 @@
|
|||||||
DESIGN
|
|
||||||
======
|
|
||||||
|
|
||||||
Litestream is a sidecar process that replicates the write ahead log (WAL) for
|
|
||||||
a SQLite database. To ensure that it can replicate every page, litestream takes
|
|
||||||
control over the checkpointing process by issuing a long running read
|
|
||||||
transaction against the database to prevent checkpointing. It then releases
|
|
||||||
this transaction once it obtains a write lock and issues the checkpoint itself.
|
|
||||||
|
|
||||||
The daemon polls the database on an interval to breifly obtain a write
|
|
||||||
transaction lock and copy over new WAL pages. Once the WAL has reached a
|
|
||||||
threshold size, litestream will issue a checkpoint and a single page write
|
|
||||||
to a table called `_litestream` to start the new WAL.
|
|
||||||
|
|
||||||
|
|
||||||
## Workflow
|
|
||||||
|
|
||||||
When litestream first loads a database, it checks if there is an existing
|
|
||||||
sidecar directory which is named `.<DB>-litestream`. If not, it initializes
|
|
||||||
the directory and starts a new generation.
|
|
||||||
|
|
||||||
A generation is a snapshot of the database followed by a continuous stream of
|
|
||||||
WAL files. A new generation is started on initialization & whenever litestream
|
|
||||||
cannot verify that it has a continuous record of WAL files. This could happen
|
|
||||||
if litestream is stopped and another process checkpoints the WAL. In this case,
|
|
||||||
a new generation ID is randomly created and a snapshot is replicated to the
|
|
||||||
appropriate destinations.
|
|
||||||
|
|
||||||
Generations also prevent two servers from replicating to the same destination
|
|
||||||
and corrupting each other's data. In this case, each server would replicate
|
|
||||||
to a different generation directory. On recovery, there will be duplicate
|
|
||||||
databases and the end user can choose which generation to recover but each
|
|
||||||
database will be uncorrupted.
|
|
||||||
|
|
||||||
|
|
||||||
## File Layout
|
|
||||||
|
|
||||||
Litestream maintains a shadow WAL which is a historical record of all previous
|
|
||||||
WAL files. These files can be deleted after a time or size threshold but should
|
|
||||||
be replicated before being deleted.
|
|
||||||
|
|
||||||
### Local
|
|
||||||
|
|
||||||
Given a database file named `db`, SQLite will create a WAL file called `db-wal`.
|
|
||||||
Litestream will then create a hidden directory called `.db-litestream` that
|
|
||||||
contains the historical record of all WAL files for the current generation.
|
|
||||||
|
|
||||||
```
|
|
||||||
db # SQLite database
|
|
||||||
db-wal # SQLite WAL
|
|
||||||
.db-litestream/
|
|
||||||
generation # current generation number
|
|
||||||
generations/
|
|
||||||
xxxxxxxx/
|
|
||||||
wal/ # WAL files
|
|
||||||
000000000000001.wal
|
|
||||||
000000000000002.wal
|
|
||||||
000000000000003.wal # active WAL
|
|
||||||
```
|
|
||||||
|
|
||||||
### Remote (S3)
|
|
||||||
|
|
||||||
```
|
|
||||||
bkt/
|
|
||||||
db/ # database path
|
|
||||||
generations/
|
|
||||||
xxxxxxxx/
|
|
||||||
snapshots/ # snapshots w/ timestamp+offset
|
|
||||||
20000101T000000Z-000000000000023.snapshot
|
|
||||||
wal/ # compressed WAL files
|
|
||||||
000000000000001-0.wal.lz4
|
|
||||||
000000000000001-<offset>.wal.lz4
|
|
||||||
000000000000002-0.wal.lz4
|
|
||||||
00000002/
|
|
||||||
snapshot/
|
|
||||||
000000000000000.snapshot
|
|
||||||
scheduled/
|
|
||||||
daily/
|
|
||||||
20000101T000000Z-000000000000023.snapshot
|
|
||||||
20000102T000000Z-000000000000036.snapshot
|
|
||||||
monthly/
|
|
||||||
20000101T000000Z-000000000000023.snapshot
|
|
||||||
|
|
||||||
wal/
|
|
||||||
000000000000001.wal.lz4
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
@@ -13,10 +13,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Naming constants.
|
||||||
const (
|
const (
|
||||||
MetaDirSuffix = "-litestream"
|
MetaDirSuffix = "-litestream"
|
||||||
|
|
||||||
@@ -152,10 +151,6 @@ func readWALHeader(filename string) ([]byte, error) {
|
|||||||
return buf[:n], err
|
return buf[:n], err
|
||||||
}
|
}
|
||||||
|
|
||||||
func readCheckpointSeqNo(hdr []byte) uint32 {
|
|
||||||
return binary.BigEndian.Uint32(hdr[12:])
|
|
||||||
}
|
|
||||||
|
|
||||||
// readFileAt reads a slice from a file.
|
// readFileAt reads a slice from a file.
|
||||||
func readFileAt(filename string, offset, n int64) ([]byte, error) {
|
func readFileAt(filename string, offset, n int64) ([]byte, error) {
|
||||||
f, err := os.Open(filename)
|
f, err := os.Open(filename)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestChecksum(t *testing.T) {
|
func TestChecksum(t *testing.T) {
|
||||||
|
|||||||
@@ -90,7 +90,6 @@ type FileReplica struct {
|
|||||||
pos Pos // last position
|
pos Pos // last position
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
ctx context.Context
|
|
||||||
cancel func()
|
cancel func()
|
||||||
|
|
||||||
snapshotTotalGauge prometheus.Gauge
|
snapshotTotalGauge prometheus.Gauge
|
||||||
@@ -534,10 +533,10 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
||||||
tx.Rollback()
|
_ = tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer tx.Rollback()
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
// Ignore if we already have a snapshot for the given WAL index.
|
// Ignore if we already have a snapshot for the given WAL index.
|
||||||
snapshotPath := r.SnapshotPath(generation, index)
|
snapshotPath := r.SnapshotPath(generation, index)
|
||||||
@@ -570,6 +569,7 @@ func (r *FileReplica) snapshotN(generation string) (int, error) {
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync replays data from the shadow WAL into the file replica.
|
||||||
func (r *FileReplica) Sync(ctx context.Context) (err error) {
|
func (r *FileReplica) Sync(ctx context.Context) (err error) {
|
||||||
// Clear last position if if an error occurs during sync.
|
// Clear last position if if an error occurs during sync.
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -884,7 +884,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SnapsotIndexAt returns the highest index for a snapshot within a generation
|
// SnapshotIndexAt returns the highest index for a snapshot within a generation
|
||||||
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
|
// that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
|
||||||
func SnapshotIndexAt(ctx context.Context, r Replica, generation string, timestamp time.Time) (int, error) {
|
func SnapshotIndexAt(ctx context.Context, r Replica, generation string, timestamp time.Time) (int, error) {
|
||||||
snapshots, err := r.Snapshots(ctx)
|
snapshots, err := r.Snapshots(ctx)
|
||||||
|
|||||||
39
s3/s3.go
39
s3/s3.go
@@ -570,10 +570,10 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
} else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
|
||||||
tx.Rollback()
|
_ = tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer tx.Rollback()
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
// Open database file handle.
|
// Open database file handle.
|
||||||
f, err := os.Open(r.db.Path())
|
f, err := os.Open(r.db.Path())
|
||||||
@@ -681,6 +681,7 @@ func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string,
|
|||||||
return "us-east-1", nil
|
return "us-east-1", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync replays data from the shadow WAL and uploads it to S3.
|
||||||
func (r *Replica) Sync(ctx context.Context) (err error) {
|
func (r *Replica) Sync(ctx context.Context) (err error) {
|
||||||
// Clear last position if if an error occurs during sync.
|
// Clear last position if if an error occurs during sync.
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -1012,40 +1013,6 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type multiReadCloser struct {
|
|
||||||
readers []io.ReadCloser
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mr *multiReadCloser) Read(p []byte) (n int, err error) {
|
|
||||||
for len(mr.readers) > 0 {
|
|
||||||
n, err = mr.readers[0].Read(p)
|
|
||||||
if err == io.EOF {
|
|
||||||
if e := mr.readers[0].Close(); e != nil {
|
|
||||||
return n, e
|
|
||||||
}
|
|
||||||
mr.readers[0] = nil
|
|
||||||
mr.readers = mr.readers[1:]
|
|
||||||
}
|
|
||||||
|
|
||||||
if n > 0 || err != io.EOF {
|
|
||||||
if err == io.EOF && len(mr.readers) > 0 {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0, io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mr *multiReadCloser) Close() (err error) {
|
|
||||||
for _, r := range mr.readers {
|
|
||||||
if e := r.Close(); e != nil && err == nil {
|
|
||||||
err = e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// S3 metrics.
|
// S3 metrics.
|
||||||
var (
|
var (
|
||||||
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
|||||||
Reference in New Issue
Block a user