Fix golangci-lint issues
This commit is contained in:
@@ -96,6 +96,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
|
|||||||
|
|
||||||
// Setup signal handler.
|
// Setup signal handler.
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
signalCh := make(chan os.Signal, 1)
|
signalCh := make(chan os.Signal, 1)
|
||||||
signal.Notify(signalCh, notifySignals...)
|
signal.Notify(signalCh, notifySignals...)
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -276,5 +275,3 @@ Examples:
|
|||||||
DefaultConfigPath(),
|
DefaultConfigPath(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
var errSkipDBExists = errors.New("database already exists, skipping")
|
|
||||||
|
|||||||
73
db.go
73
db.go
@@ -12,13 +12,10 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -429,7 +426,9 @@ func (db *DB) Close() (err error) {
|
|||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
r.Stop(true)
|
if e := r.Stop(true); e != nil && err == nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release the read lock to allow other applications to handle checkpointing.
|
// Release the read lock to allow other applications to handle checkpointing.
|
||||||
@@ -1142,11 +1141,11 @@ func (db *DB) copyToShadowWAL(ctx context.Context) error {
|
|||||||
go func() {
|
go func() {
|
||||||
zw := lz4.NewWriter(pw)
|
zw := lz4.NewWriter(pw)
|
||||||
if _, err := io.Copy(zw, &io.LimitedReader{R: f, N: walByteN}); err != nil {
|
if _, err := io.Copy(zw, &io.LimitedReader{R: f, N: walByteN}); err != nil {
|
||||||
pw.CloseWithError(err)
|
_ = pw.CloseWithError(err)
|
||||||
} else if err := zw.Close(); err != nil {
|
} else if err := zw.Close(); err != nil {
|
||||||
pw.CloseWithError(err)
|
_ = pw.CloseWithError(err)
|
||||||
}
|
}
|
||||||
pw.Close()
|
_ = pw.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Write a new, compressed segment via pipe.
|
// Write a new, compressed segment via pipe.
|
||||||
@@ -1336,47 +1335,12 @@ func (itr *shadowWALSegmentIterator) WALSegment() WALSegmentInfo {
|
|||||||
return itr.infos[0]
|
return itr.infos[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
// frameAlign returns a frame-aligned offset.
|
|
||||||
// Returns zero if offset is less than the WAL header size.
|
|
||||||
func frameAlign(offset int64, pageSize int) int64 {
|
|
||||||
assert(offset >= 0, "frameAlign(): offset must be non-negative")
|
|
||||||
assert(pageSize >= 0, "frameAlign(): page size must be non-negative")
|
|
||||||
|
|
||||||
if offset < WALHeaderSize {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
frameSize := WALFrameHeaderSize + int64(pageSize)
|
|
||||||
frameN := (offset - WALHeaderSize) / frameSize
|
|
||||||
return (frameN * frameSize) + WALHeaderSize
|
|
||||||
}
|
|
||||||
|
|
||||||
// SQLite WAL constants
|
// SQLite WAL constants
|
||||||
const (
|
const (
|
||||||
WALHeaderChecksumOffset = 24
|
WALHeaderChecksumOffset = 24
|
||||||
WALFrameHeaderChecksumOffset = 16
|
WALFrameHeaderChecksumOffset = 16
|
||||||
)
|
)
|
||||||
|
|
||||||
func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
|
|
||||||
// Determine the byte offset of the checksum for the header (if no pages
|
|
||||||
// exist) or for the last page (if at least one page exists).
|
|
||||||
offset := int64(WALHeaderChecksumOffset)
|
|
||||||
if fi, err := f.Stat(); err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
} else if sz := frameAlign(fi.Size(), pageSize); fi.Size() > WALHeaderSize {
|
|
||||||
offset = sz - int64(pageSize) - WALFrameHeaderSize + WALFrameHeaderChecksumOffset
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read big endian checksum.
|
|
||||||
b := make([]byte, 8)
|
|
||||||
if n, err := f.ReadAt(b, offset); err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
} else if n != len(b) {
|
|
||||||
return 0, 0, io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return binary.BigEndian.Uint32(b[0:]), binary.BigEndian.Uint32(b[4:]), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checkpoint performs a checkpoint on the WAL file.
|
// Checkpoint performs a checkpoint on the WAL file.
|
||||||
func (db *DB) Checkpoint(ctx context.Context, mode string) (err error) {
|
func (db *DB) Checkpoint(ctx context.Context, mode string) (err error) {
|
||||||
db.mu.Lock()
|
db.mu.Lock()
|
||||||
@@ -1584,31 +1548,6 @@ func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error) {
|
|||||||
return h.Sum64(), pos, nil
|
return h.Sum64(), pos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseWALPath returns the index for the WAL file.
|
|
||||||
// Returns an error if the path is not a valid WAL path.
|
|
||||||
func parseWALPath(s string) (index int, err error) {
|
|
||||||
s = filepath.Base(s)
|
|
||||||
|
|
||||||
a := walPathRegex.FindStringSubmatch(s)
|
|
||||||
if a == nil {
|
|
||||||
return 0, fmt.Errorf("invalid wal path: %s", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
i32, _ := strconv.ParseUint(a[1], 16, 32)
|
|
||||||
if i32 > math.MaxInt32 {
|
|
||||||
return 0, fmt.Errorf("index too large in wal path: %s", s)
|
|
||||||
}
|
|
||||||
return int(i32), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// formatWALPath formats a WAL filename with a given index.
|
|
||||||
func formatWALPath(index int) string {
|
|
||||||
assert(index >= 0, "wal index must be non-negative")
|
|
||||||
return FormatIndex(index) + ".wal"
|
|
||||||
}
|
|
||||||
|
|
||||||
var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.wal$`)
|
|
||||||
|
|
||||||
// ReadWALFields iterates over the header & frames in the WAL data in r.
|
// ReadWALFields iterates over the header & frames in the WAL data in r.
|
||||||
// Returns salt, checksum, byte order & the last frame. WAL data must start
|
// Returns salt, checksum, byte order & the last frame. WAL data must start
|
||||||
// from the beginning of the WAL header and must end on either the WAL header
|
// from the beginning of the WAL header and must end on either the WAL header
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/md5"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
@@ -180,7 +181,7 @@ func ParseWALSegmentPath(s string) (index int, offset int64, err error) {
|
|||||||
if i32 > math.MaxInt32 {
|
if i32 > math.MaxInt32 {
|
||||||
return 0, 0, fmt.Errorf("index too large in wal segment path %q", s)
|
return 0, 0, fmt.Errorf("index too large in wal segment path %q", s)
|
||||||
}
|
}
|
||||||
off64, _ := strconv.ParseInt(a[2], 16, 64)
|
off64, _ := strconv.ParseUint(a[2], 16, 64)
|
||||||
if off64 > math.MaxInt64 {
|
if off64 > math.MaxInt64 {
|
||||||
return 0, 0, fmt.Errorf("offset too large in wal segment path %q", s)
|
return 0, 0, fmt.Errorf("offset too large in wal segment path %q", s)
|
||||||
}
|
}
|
||||||
@@ -228,3 +229,8 @@ func TruncateDuration(d time.Duration) time.Duration {
|
|||||||
}
|
}
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MD5hash returns a hex-encoded MD5 hash of b.
|
||||||
|
func MD5hash(b []byte) string {
|
||||||
|
return fmt.Sprintf("%x", md5.Sum(b))
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,9 +1,12 @@
|
|||||||
package testingutil
|
package testingutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/pierrec/lz4/v4"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadFile reads all data from filename. Fail on error.
|
// ReadFile reads all data from filename. Fail on error.
|
||||||
@@ -62,3 +65,26 @@ func Setenv(tb testing.TB, key, value string) func() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CompressLZ4(tb testing.TB, b []byte) []byte {
|
||||||
|
tb.Helper()
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
zw := lz4.NewWriter(&buf)
|
||||||
|
if _, err := zw.Write(b); err != nil {
|
||||||
|
tb.Fatal(err)
|
||||||
|
} else if err := zw.Close(); err != nil {
|
||||||
|
tb.Fatal(err)
|
||||||
|
}
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func DecompressLZ4(tb testing.TB, b []byte) []byte {
|
||||||
|
tb.Helper()
|
||||||
|
|
||||||
|
buf, err := io.ReadAll(lz4.NewReader(bytes.NewReader(b)))
|
||||||
|
if err != nil {
|
||||||
|
tb.Fatal(err)
|
||||||
|
}
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
package litestream
|
package litestream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/md5"
|
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -475,12 +473,6 @@ func isHexChar(ch rune) bool {
|
|||||||
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
|
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
|
||||||
}
|
}
|
||||||
|
|
||||||
// md5hash returns a hex-encoded MD5 hash of b.
|
|
||||||
func md5hash(b []byte) string {
|
|
||||||
sum := md5.Sum(b)
|
|
||||||
return hex.EncodeToString(sum[:])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tracef is used for low-level tracing.
|
// Tracef is used for low-level tracing.
|
||||||
var Tracef = func(format string, a ...interface{}) {}
|
var Tracef = func(format string, a ...interface{}) {}
|
||||||
|
|
||||||
|
|||||||
@@ -4,13 +4,11 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"github.com/pierrec/lz4/v4"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestChecksum(t *testing.T) {
|
func TestChecksum(t *testing.T) {
|
||||||
@@ -80,26 +78,3 @@ func fileEqual(tb testing.TB, x, y string) bool {
|
|||||||
|
|
||||||
return bytes.Equal(bx, by)
|
return bytes.Equal(bx, by)
|
||||||
}
|
}
|
||||||
|
|
||||||
func compressLZ4(tb testing.TB, b []byte) []byte {
|
|
||||||
tb.Helper()
|
|
||||||
|
|
||||||
var buf bytes.Buffer
|
|
||||||
zw := lz4.NewWriter(&buf)
|
|
||||||
if _, err := zw.Write(b); err != nil {
|
|
||||||
tb.Fatal(err)
|
|
||||||
} else if err := zw.Close(); err != nil {
|
|
||||||
tb.Fatal(err)
|
|
||||||
}
|
|
||||||
return buf.Bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
func decompressLZ4(tb testing.TB, b []byte) []byte {
|
|
||||||
tb.Helper()
|
|
||||||
|
|
||||||
buf, err := io.ReadAll(lz4.NewReader(bytes.NewReader(b)))
|
|
||||||
if err != nil {
|
|
||||||
tb.Fatal(err)
|
|
||||||
}
|
|
||||||
return buf
|
|
||||||
}
|
|
||||||
|
|||||||
12
replica.go
12
replica.go
@@ -105,14 +105,14 @@ func (r *Replica) DB() *DB { return r.db }
|
|||||||
func (r *Replica) Client() ReplicaClient { return r.client }
|
func (r *Replica) Client() ReplicaClient { return r.client }
|
||||||
|
|
||||||
// Starts replicating in a background goroutine.
|
// Starts replicating in a background goroutine.
|
||||||
func (r *Replica) Start(ctx context.Context) error {
|
func (r *Replica) Start(ctx context.Context) {
|
||||||
// Ignore if replica is being used sychronously.
|
// Ignore if replica is being used sychronously.
|
||||||
if !r.MonitorEnabled {
|
if !r.MonitorEnabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop previous replication.
|
// Stop previous replication.
|
||||||
r.Stop(false)
|
_ = r.Stop(false)
|
||||||
|
|
||||||
// Wrap context with cancelation.
|
// Wrap context with cancelation.
|
||||||
ctx, r.cancel = context.WithCancel(ctx)
|
ctx, r.cancel = context.WithCancel(ctx)
|
||||||
@@ -123,8 +123,6 @@ func (r *Replica) Start(ctx context.Context) error {
|
|||||||
go func() { defer r.wg.Done(); r.retainer(ctx) }()
|
go func() { defer r.wg.Done(); r.retainer(ctx) }()
|
||||||
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
|
go func() { defer r.wg.Done(); r.snapshotter(ctx) }()
|
||||||
go func() { defer r.wg.Done(); r.validator(ctx) }()
|
go func() { defer r.wg.Done(); r.validator(ctx) }()
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop cancels any outstanding replication and blocks until finished.
|
// Stop cancels any outstanding replication and blocks until finished.
|
||||||
@@ -512,10 +510,10 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
|||||||
defer zr.Close()
|
defer zr.Close()
|
||||||
|
|
||||||
if _, err := io.Copy(zr, r.f); err != nil {
|
if _, err := io.Copy(zr, r.f); err != nil {
|
||||||
pw.CloseWithError(err)
|
_ = pw.CloseWithError(err)
|
||||||
return err
|
return err
|
||||||
} else if err := zr.Close(); err != nil {
|
} else if err := zr.Close(); err != nil {
|
||||||
pw.CloseWithError(err)
|
_ = pw.CloseWithError(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return pw.Close()
|
return pw.Close()
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/benbjohnson/litestream"
|
"github.com/benbjohnson/litestream"
|
||||||
|
"github.com/benbjohnson/litestream/internal/testingutil"
|
||||||
"github.com/benbjohnson/litestream/mock"
|
"github.com/benbjohnson/litestream/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -193,7 +194,7 @@ func testWALDownloader(t *testing.T, parallelism int) {
|
|||||||
filename := filepath.Join(tempDir, "generations", "0000000000000000", "wal", fmt.Sprintf("%08x", i), "00000000.wal.lz4")
|
filename := filepath.Join(tempDir, "generations", "0000000000000000", "wal", fmt.Sprintf("%08x", i), "00000000.wal.lz4")
|
||||||
if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil {
|
if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if err := os.WriteFile(filename, compressLZ4(t, []byte(fmt.Sprint(i))), 0666); err != nil {
|
} else if err := os.WriteFile(filename, testingutil.CompressLZ4(t, []byte(fmt.Sprint(i))), 0666); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user