From 7fb98df240295a1e2f892d60737bd216f251dd60 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 18 Jan 2021 15:46:52 -0700 Subject: [PATCH] cleanup --- cmd/litestream/databases.go | 3 ++ cmd/litestream/generations.go | 3 ++ cmd/litestream/main.go | 11 +++++ cmd/litestream/replicate.go | 6 ++- cmd/litestream/restore.go | 6 ++- cmd/litestream/snapshots.go | 3 ++ cmd/litestream/version.go | 3 ++ cmd/litestream/wal.go | 3 ++ db.go | 43 ++++++----------- db_test.go | 2 +- doc/DESIGN.md | 88 ----------------------------------- litestream.go | 7 +-- litestream_test.go | 1 + replica.go | 8 ++-- s3/s3.go | 39 ++-------------- 15 files changed, 60 insertions(+), 166 deletions(-) delete mode 100644 doc/DESIGN.md diff --git a/cmd/litestream/databases.go b/cmd/litestream/databases.go index b5b8952..eac0dff 100644 --- a/cmd/litestream/databases.go +++ b/cmd/litestream/databases.go @@ -10,8 +10,10 @@ import ( "text/tabwriter" ) +// DatabasesCommand is a command for listing managed databases. type DatabasesCommand struct{} +// Run executes the command. func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) { var configPath string fs := flag.NewFlagSet("litestream-databases", flag.ContinueOnError) @@ -56,6 +58,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) { return nil } +// Usage prints the help screen to STDOUT. func (c *DatabasesCommand) Usage() { fmt.Printf(` The databases command lists all databases in the configuration file. diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index ba28e7f..b1c2d84 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -12,8 +12,10 @@ import ( "time" ) +// GenerationsCommand represents a command to list all generations for a database. type GenerationsCommand struct{} +// Run executes the command. func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) { var configPath string fs := flag.NewFlagSet("litestream-generations", flag.ContinueOnError) @@ -96,6 +98,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) return nil } +// Usage prints the help message to STDOUT. func (c *GenerationsCommand) Usage() { fmt.Printf(` The generations command lists all generations for a database. It also lists diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 5213f71..469ba9e 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -16,6 +16,7 @@ import ( "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/s3" + _ "github.com/mattn/go-sqlite3" "gopkg.in/yaml.v2" ) @@ -36,12 +37,15 @@ func main() { } } +// Main represents the main program execution. type Main struct{} +// NewMain returns a new instance of Main. func NewMain() *Main { return &Main{} } +// Run executes the program. func (m *Main) Run(ctx context.Context, args []string) (err error) { var cmd string if len(args) > 0 { @@ -72,6 +76,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { } } +// Usage prints the help screen to STDOUT. func (m *Main) Usage() { fmt.Println(` litestream is a tool for replicating SQLite databases. @@ -112,6 +117,7 @@ type Config struct { Bucket string `yaml:"bucket"` } +// Normalize expands paths and parses URL-specified replicas. func (c *Config) Normalize() error { for i := range c.DBs { if err := c.DBs[i].Normalize(); err != nil { @@ -128,6 +134,7 @@ func DefaultConfig() Config { } } +// DBConfig returns database configuration by path. func (c *Config) DBConfig(path string) *DBConfig { for _, dbConfig := range c.DBs { if dbConfig.Path == path { @@ -167,11 +174,13 @@ func ReadConfigFile(filename string) (Config, error) { return config, nil } +// DBConfig represents the configuration for a single database. type DBConfig struct { Path string `yaml:"path"` Replicas []*ReplicaConfig `yaml:"replicas"` } +// Normalize expands paths and parses URL-specified replicas. func (c *DBConfig) Normalize() error { for i := range c.Replicas { if err := c.Replicas[i].Normalize(); err != nil { @@ -181,6 +190,7 @@ func (c *DBConfig) Normalize() error { return nil } +// ReplicaConfig represents the configuration for a single replica in a database. type ReplicaConfig struct { Type string `yaml:"type"` // "file", "s3" Name string `yaml:"name"` // name of replica, optional. @@ -197,6 +207,7 @@ type ReplicaConfig struct { Bucket string `yaml:"bucket"` } +// Normalize expands paths and parses URL-specified replicas. func (c *ReplicaConfig) Normalize() error { // Expand path filename, if necessary. if prefix := "~" + string(os.PathSeparator); strings.HasPrefix(c.Path, prefix) { diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index aec4fcb..cfd57d0 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +// ReplicateCommand represents a command that continuously replicates SQLite databases. type ReplicateCommand struct { ConfigPath string Config Config @@ -96,7 +97,9 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) { fmt.Printf("serving metrics on http://localhost:%s/metrics\n", port) go func() { http.Handle("/metrics", promhttp.Handler()) - http.ListenAndServe(config.Addr, nil) + if err := http.ListenAndServe(config.Addr, nil); err != nil { + log.Printf("cannot start metrics server: %s", err) + } }() } @@ -126,6 +129,7 @@ func (c *ReplicateCommand) Close() (err error) { return err } +// Usage prints the help screen to STDOUT. func (c *ReplicateCommand) Usage() { fmt.Printf(` The replicate command starts a server to monitor & replicate databases diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 968e29c..7c2365c 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -13,9 +13,10 @@ import ( "github.com/benbjohnson/litestream" ) -type RestoreCommand struct { -} +// RestoreCommand represents a command to restore a database from a backup. +type RestoreCommand struct{} +// Run executes the command. func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { var configPath string opt := litestream.NewRestoreOptions() @@ -82,6 +83,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { return db.Restore(ctx, opt) } +// Usage prints the help screen to STDOUT. func (c *RestoreCommand) Usage() { fmt.Printf(` The restore command recovers a database from a previous snapshot and WAL. diff --git a/cmd/litestream/snapshots.go b/cmd/litestream/snapshots.go index 42ab161..69700e0 100644 --- a/cmd/litestream/snapshots.go +++ b/cmd/litestream/snapshots.go @@ -13,8 +13,10 @@ import ( "github.com/benbjohnson/litestream" ) +// SnapshotsCommand represents a command to list snapshots for a command. type SnapshotsCommand struct{} +// Run executes the command. func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { var configPath string fs := flag.NewFlagSet("litestream-snapshots", flag.ContinueOnError) @@ -85,6 +87,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { return nil } +// Usage prints the help screen to STDOUT. func (c *SnapshotsCommand) Usage() { fmt.Printf(` The snapshots command lists all snapshots available for a database. diff --git a/cmd/litestream/version.go b/cmd/litestream/version.go index c0daaba..4669861 100644 --- a/cmd/litestream/version.go +++ b/cmd/litestream/version.go @@ -6,8 +6,10 @@ import ( "fmt" ) +// VersionCommand represents a command to print the current version. type VersionCommand struct{} +// Run executes the command. func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream-version", flag.ContinueOnError) fs.Usage = c.Usage @@ -20,6 +22,7 @@ func (c *VersionCommand) Run(ctx context.Context, args []string) (err error) { return nil } +// Usage prints the help screen to STDOUT. func (c *VersionCommand) Usage() { fmt.Println(` Prints the version. diff --git a/cmd/litestream/wal.go b/cmd/litestream/wal.go index 17535fa..d2182a7 100644 --- a/cmd/litestream/wal.go +++ b/cmd/litestream/wal.go @@ -13,8 +13,10 @@ import ( "github.com/benbjohnson/litestream" ) +// WALCommand represents a command to list WAL files for a database. type WALCommand struct{} +// Run executes the command. func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { var configPath string fs := flag.NewFlagSet("litestream-wal", flag.ContinueOnError) @@ -91,6 +93,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { return nil } +// Usage prints the help screen to STDOUT. func (c *WALCommand) Usage() { fmt.Printf(` The wal command lists all wal files available for a database. diff --git a/db.go b/db.go index 1817d21..01a6ff0 100644 --- a/db.go +++ b/db.go @@ -253,6 +253,7 @@ func (db *DB) PageSize() int { return db.pageSize } +// Open initializes the background monitoring goroutine. func (db *DB) Open() (err error) { // Validate that all replica names are unique. m := make(map[string]struct{}) @@ -545,7 +546,7 @@ func (db *DB) acquireReadLock() error { // Execute read query to obtain read lock. if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { - tx.Rollback() + _ = tx.Rollback() return err } @@ -782,15 +783,15 @@ func (db *DB) verify() (info syncInfo, err error) { info.generation = generation // Determine total bytes of real DB for metrics. - if fi, err := os.Stat(db.Path()); err != nil { + fi, err := os.Stat(db.Path()) + if err != nil { return info, err - } else { - info.dbModTime = fi.ModTime() - db.dbSizeGauge.Set(float64(fi.Size())) } + info.dbModTime = fi.ModTime() + db.dbSizeGauge.Set(float64(fi.Size())) // Determine total bytes of real WAL. - fi, err := os.Stat(db.WALPath()) + fi, err = os.Stat(db.WALPath()) if err != nil { return info, err } @@ -1140,8 +1141,11 @@ func (r *ShadowWALReader) Read(p []byte) (n int, err error) { return n, err } -const WALHeaderChecksumOffset = 24 -const WALFrameHeaderChecksumOffset = 16 +// SQLite WAL constants +const ( + WALHeaderChecksumOffset = 24 + WALFrameHeaderChecksumOffset = 16 +) func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) { // Determine the byte offset of the checksum for the header (if no pages @@ -1192,7 +1196,7 @@ func (db *DB) checkpoint(mode string) (err error) { if err := db.releaseReadLock(); err != nil { return fmt.Errorf("release read lock: %w", err) } - defer db.acquireReadLock() + defer func() { _ = db.acquireReadLock() }() // A non-forced checkpoint is issued as "PASSIVE". This will only checkpoint // if there are not pending transactions. A forced checkpoint ("RESTART") @@ -1388,24 +1392,6 @@ func checksumFile(filename string) (uint64, error) { return h.Sum64(), nil } -func checksumFileAt(filename string, offset int64) (uint64, error) { - f, err := os.Open(filename) - if err != nil { - return 0, err - } - defer f.Close() - - if _, err := f.Seek(offset, io.SeekStart); err != nil { - return 0, err - } - - h := crc64.New(crc64.MakeTable(crc64.ISO)) - if _, err := io.Copy(h, f); err != nil { - return 0, err - } - return h.Sum64(), nil -} - func (db *DB) restoreTarget(ctx context.Context, opt RestoreOptions, logger *log.Logger) (Replica, string, error) { var target struct { replica Replica @@ -1532,7 +1518,7 @@ func (db *DB) restoreWAL(ctx context.Context, r Replica, generation string, inde // CRC64 returns a CRC-64 ISO checksum of the database and its current position. // -// This function obtains a read lock so it prevents syncs from occuring until +// This function obtains a read lock so it prevents syncs from occurring until // the operation is complete. The database will still be usable but it will be // unable to checkpoint during this time. func (db *DB) CRC64() (uint64, Pos, error) { @@ -1602,6 +1588,7 @@ type RestoreOptions struct { Logger *log.Logger } +// NewRestoreOptions returns a new instance of RestoreOptions with defaults. func NewRestoreOptions() RestoreOptions { return RestoreOptions{ Index: math.MaxInt64, diff --git a/db_test.go b/db_test.go index 4be672f..7227cfb 100644 --- a/db_test.go +++ b/db_test.go @@ -371,7 +371,7 @@ func TestDB_Sync(t *testing.T) { shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index) if buf, err := ioutil.ReadFile(shadowWALPath); err != nil { t.Fatal(err) - } else if ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil { + } else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil { t.Fatal(err) } diff --git a/doc/DESIGN.md b/doc/DESIGN.md deleted file mode 100644 index 67a40e2..0000000 --- a/doc/DESIGN.md +++ /dev/null @@ -1,88 +0,0 @@ -DESIGN -====== - -Litestream is a sidecar process that replicates the write ahead log (WAL) for -a SQLite database. To ensure that it can replicate every page, litestream takes -control over the checkpointing process by issuing a long running read -transaction against the database to prevent checkpointing. It then releases -this transaction once it obtains a write lock and issues the checkpoint itself. - -The daemon polls the database on an interval to breifly obtain a write -transaction lock and copy over new WAL pages. Once the WAL has reached a -threshold size, litestream will issue a checkpoint and a single page write -to a table called `_litestream` to start the new WAL. - - -## Workflow - -When litestream first loads a database, it checks if there is an existing -sidecar directory which is named `.-litestream`. If not, it initializes -the directory and starts a new generation. - -A generation is a snapshot of the database followed by a continuous stream of -WAL files. A new generation is started on initialization & whenever litestream -cannot verify that it has a continuous record of WAL files. This could happen -if litestream is stopped and another process checkpoints the WAL. In this case, -a new generation ID is randomly created and a snapshot is replicated to the -appropriate destinations. - -Generations also prevent two servers from replicating to the same destination -and corrupting each other's data. In this case, each server would replicate -to a different generation directory. On recovery, there will be duplicate -databases and the end user can choose which generation to recover but each -database will be uncorrupted. - - -## File Layout - -Litestream maintains a shadow WAL which is a historical record of all previous -WAL files. These files can be deleted after a time or size threshold but should -be replicated before being deleted. - -### Local - -Given a database file named `db`, SQLite will create a WAL file called `db-wal`. -Litestream will then create a hidden directory called `.db-litestream` that -contains the historical record of all WAL files for the current generation. - -``` -db # SQLite database -db-wal # SQLite WAL -.db-litestream/ - generation # current generation number - generations/ - xxxxxxxx/ - wal/ # WAL files - 000000000000001.wal - 000000000000002.wal - 000000000000003.wal # active WAL -``` - -### Remote (S3) - -``` -bkt/ - db/ # database path - generations/ - xxxxxxxx/ - snapshots/ # snapshots w/ timestamp+offset - 20000101T000000Z-000000000000023.snapshot - wal/ # compressed WAL files - 000000000000001-0.wal.lz4 - 000000000000001-.wal.lz4 - 000000000000002-0.wal.lz4 - 00000002/ - snapshot/ - 000000000000000.snapshot - scheduled/ - daily/ - 20000101T000000Z-000000000000023.snapshot - 20000102T000000Z-000000000000036.snapshot - monthly/ - 20000101T000000Z-000000000000023.snapshot - - wal/ - 000000000000001.wal.lz4 -``` - - diff --git a/litestream.go b/litestream.go index 98c89ed..cf1b094 100644 --- a/litestream.go +++ b/litestream.go @@ -13,10 +13,9 @@ import ( "strings" "syscall" "time" - - _ "github.com/mattn/go-sqlite3" ) +// Naming constants. const ( MetaDirSuffix = "-litestream" @@ -152,10 +151,6 @@ func readWALHeader(filename string) ([]byte, error) { return buf[:n], err } -func readCheckpointSeqNo(hdr []byte) uint32 { - return binary.BigEndian.Uint32(hdr[12:]) -} - // readFileAt reads a slice from a file. func readFileAt(filename string, offset, n int64) ([]byte, error) { f, err := os.Open(filename) diff --git a/litestream_test.go b/litestream_test.go index 48304da..a03a748 100644 --- a/litestream_test.go +++ b/litestream_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/benbjohnson/litestream" + _ "github.com/mattn/go-sqlite3" ) func TestChecksum(t *testing.T) { diff --git a/replica.go b/replica.go index 465136d..cfdbbd3 100644 --- a/replica.go +++ b/replica.go @@ -90,7 +90,6 @@ type FileReplica struct { pos Pos // last position wg sync.WaitGroup - ctx context.Context cancel func() snapshotTotalGauge prometheus.Gauge @@ -534,10 +533,10 @@ func (r *FileReplica) snapshot(ctx context.Context, generation string, index int if err != nil { return err } else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { - tx.Rollback() + _ = tx.Rollback() return err } - defer tx.Rollback() + defer func() { _ = tx.Rollback() }() // Ignore if we already have a snapshot for the given WAL index. snapshotPath := r.SnapshotPath(generation, index) @@ -570,6 +569,7 @@ func (r *FileReplica) snapshotN(generation string) (int, error) { return n, nil } +// Sync replays data from the shadow WAL into the file replica. func (r *FileReplica) Sync(ctx context.Context) (err error) { // Clear last position if if an error occurs during sync. defer func() { @@ -884,7 +884,7 @@ func (r *FileReplica) deleteGenerationWALBefore(ctx context.Context, generation return nil } -// SnapsotIndexAt returns the highest index for a snapshot within a generation +// SnapshotIndexAt returns the highest index for a snapshot within a generation // that occurs before timestamp. If timestamp is zero, returns the latest snapshot. func SnapshotIndexAt(ctx context.Context, r Replica, generation string, timestamp time.Time) (int, error) { snapshots, err := r.Snapshots(ctx) diff --git a/s3/s3.go b/s3/s3.go index cbb507c..1b94fd9 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -570,10 +570,10 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er if err != nil { return err } else if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil { - tx.Rollback() + _ = tx.Rollback() return err } - defer tx.Rollback() + defer func() { _ = tx.Rollback() }() // Open database file handle. f, err := os.Open(r.db.Path()) @@ -681,6 +681,7 @@ func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string, return "us-east-1", nil } +// Sync replays data from the shadow WAL and uploads it to S3. func (r *Replica) Sync(ctx context.Context) (err error) { // Clear last position if if an error occurs during sync. defer func() { @@ -1012,40 +1013,6 @@ func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, return nil } -type multiReadCloser struct { - readers []io.ReadCloser -} - -func (mr *multiReadCloser) Read(p []byte) (n int, err error) { - for len(mr.readers) > 0 { - n, err = mr.readers[0].Read(p) - if err == io.EOF { - if e := mr.readers[0].Close(); e != nil { - return n, e - } - mr.readers[0] = nil - mr.readers = mr.readers[1:] - } - - if n > 0 || err != io.EOF { - if err == io.EOF && len(mr.readers) > 0 { - err = nil - } - return - } - } - return 0, io.EOF -} - -func (mr *multiReadCloser) Close() (err error) { - for _, r := range mr.readers { - if e := r.Close(); e != nil && err == nil { - err = e - } - } - return err -} - // S3 metrics. var ( operationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{