Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c1ae968188 | ||
|
|
9f0e50ddf7 | ||
|
|
fe9ab5c517 | ||
|
|
d02ba97453 | ||
|
|
b1abd6bd99 | ||
|
|
fd892eef6d | ||
|
|
1bfcaa4a17 | ||
|
|
a369b05ee4 | ||
|
|
e0493f979a | ||
|
|
016546a3d5 | ||
|
|
10f97f90f2 |
14
.github/workflows/commit.yml
vendored
14
.github/workflows/commit.yml
vendored
@@ -3,12 +3,12 @@ on: push
|
|||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
name: Build & Unit Test
|
name: Build & Unit Test
|
||||||
runs-on: ubuntu-22.04
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-go@v2
|
- uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: '1.20'
|
go-version: '1.21'
|
||||||
- uses: actions/cache@v2
|
- uses: actions/cache@v2
|
||||||
with:
|
with:
|
||||||
path: ~/go/pkg/mod
|
path: ~/go/pkg/mod
|
||||||
@@ -32,7 +32,7 @@ jobs:
|
|||||||
|
|
||||||
# long-running-test:
|
# long-running-test:
|
||||||
# name: Run Long Running Unit Test
|
# name: Run Long Running Unit Test
|
||||||
# runs-on: ubuntu-22.04
|
# runs-on: ubuntu-latest
|
||||||
# steps:
|
# steps:
|
||||||
# - uses: actions/checkout@v2
|
# - uses: actions/checkout@v2
|
||||||
# - uses: actions/setup-go@v2
|
# - uses: actions/setup-go@v2
|
||||||
@@ -49,7 +49,7 @@ jobs:
|
|||||||
|
|
||||||
# s3-integration-test:
|
# s3-integration-test:
|
||||||
# name: Run S3 Integration Tests
|
# name: Run S3 Integration Tests
|
||||||
# runs-on: ubuntu-18.04
|
# runs-on: ubuntu-latest
|
||||||
# needs: build
|
# needs: build
|
||||||
# steps:
|
# steps:
|
||||||
# - uses: actions/download-artifact@v2
|
# - uses: actions/download-artifact@v2
|
||||||
@@ -66,7 +66,7 @@ jobs:
|
|||||||
|
|
||||||
# gcp-integration-test:
|
# gcp-integration-test:
|
||||||
# name: Run GCP Integration Tests
|
# name: Run GCP Integration Tests
|
||||||
# runs-on: ubuntu-18.04
|
# runs-on: ubuntu-latest
|
||||||
# needs: build
|
# needs: build
|
||||||
# steps:
|
# steps:
|
||||||
# - name: Extract GCP credentials
|
# - name: Extract GCP credentials
|
||||||
@@ -87,7 +87,7 @@ jobs:
|
|||||||
|
|
||||||
# abs-integration-test:
|
# abs-integration-test:
|
||||||
# name: Run Azure Blob Store Integration Tests
|
# name: Run Azure Blob Store Integration Tests
|
||||||
# runs-on: ubuntu-18.04
|
# runs-on: ubuntu-latest
|
||||||
# needs: build
|
# needs: build
|
||||||
# steps:
|
# steps:
|
||||||
# - uses: actions/download-artifact@v2
|
# - uses: actions/download-artifact@v2
|
||||||
@@ -103,7 +103,7 @@ jobs:
|
|||||||
|
|
||||||
# sftp-integration-test:
|
# sftp-integration-test:
|
||||||
# name: Run SFTP Integration Tests
|
# name: Run SFTP Integration Tests
|
||||||
# runs-on: ubuntu-18.04
|
# runs-on: ubuntu-latest
|
||||||
# needs: build
|
# needs: build
|
||||||
# steps:
|
# steps:
|
||||||
# - name: Extract SSH key
|
# - name: Extract SSH key
|
||||||
|
|||||||
2
.github/workflows/release.docker.yml
vendored
2
.github/workflows/release.docker.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
|||||||
docker:
|
docker:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
env:
|
env:
|
||||||
PLATFORMS: "linux/amd64,linux/arm64"
|
PLATFORMS: "linux/amd64,linux/arm64,linux/arm/v7"
|
||||||
VERSION: "${{ github.event_name == 'release' && github.event.release.name || github.sha }}"
|
VERSION: "${{ github.event_name == 'release' && github.event.release.name || github.sha }}"
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
|||||||
6
.github/workflows/release.linux.yml
vendored
6
.github/workflows/release.linux.yml
vendored
@@ -6,7 +6,7 @@ on:
|
|||||||
name: release (linux)
|
name: release (linux)
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
runs-on: ubuntu-18.04
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
include:
|
include:
|
||||||
@@ -31,7 +31,7 @@ jobs:
|
|||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-go@v2
|
- uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: '1.16'
|
go-version: '1.21'
|
||||||
|
|
||||||
- id: release
|
- id: release
|
||||||
uses: bruceadams/get-release@v1.2.2
|
uses: bruceadams/get-release@v1.2.2
|
||||||
@@ -54,7 +54,7 @@ jobs:
|
|||||||
mkdir -p dist
|
mkdir -p dist
|
||||||
cp etc/litestream.yml etc/litestream.service dist
|
cp etc/litestream.yml etc/litestream.service dist
|
||||||
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml
|
cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml
|
||||||
CGO_ENABLED=1 go build -ldflags "-s -w -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o dist/litestream ./cmd/litestream
|
CGO_ENABLED=1 go build -ldflags "-s -w -extldflags "-static" -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -tags osusergo,netgo,sqlite_omit_load_extension -o dist/litestream ./cmd/litestream
|
||||||
|
|
||||||
cd dist
|
cd dist
|
||||||
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz litestream
|
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz litestream
|
||||||
|
|||||||
62
.github/workflows/release.linux_static.yml
vendored
62
.github/workflows/release.linux_static.yml
vendored
@@ -1,62 +0,0 @@
|
|||||||
on:
|
|
||||||
release:
|
|
||||||
types:
|
|
||||||
- created
|
|
||||||
|
|
||||||
name: release (linux/static)
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
runs-on: ubuntu-18.04
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
include:
|
|
||||||
- arch: amd64
|
|
||||||
cc: gcc
|
|
||||||
- arch: arm64
|
|
||||||
cc: aarch64-linux-gnu-gcc
|
|
||||||
- arch: arm
|
|
||||||
arm: 6
|
|
||||||
cc: arm-linux-gnueabi-gcc
|
|
||||||
- arch: arm
|
|
||||||
arm: 7
|
|
||||||
cc: arm-linux-gnueabihf-gcc
|
|
||||||
|
|
||||||
env:
|
|
||||||
GOOS: linux
|
|
||||||
GOARCH: ${{ matrix.arch }}
|
|
||||||
GOARM: ${{ matrix.arm }}
|
|
||||||
CC: ${{ matrix.cc }}
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
- uses: actions/setup-go@v2
|
|
||||||
with:
|
|
||||||
go-version: '1.16'
|
|
||||||
|
|
||||||
- id: release
|
|
||||||
uses: bruceadams/get-release@v1.2.2
|
|
||||||
env:
|
|
||||||
GITHUB_TOKEN: ${{ github.token }}
|
|
||||||
|
|
||||||
- name: Install cross-compilers
|
|
||||||
run: |
|
|
||||||
sudo apt-get update
|
|
||||||
sudo apt-get install -y gcc-aarch64-linux-gnu gcc-arm-linux-gnueabihf gcc-arm-linux-gnueabi
|
|
||||||
|
|
||||||
- name: Build litestream
|
|
||||||
run: |
|
|
||||||
rm -rf dist
|
|
||||||
mkdir -p dist
|
|
||||||
CGO_ENABLED=1 go build -ldflags "-s -w -extldflags "-static" -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -tags osusergo,netgo,sqlite_omit_load_extension -o dist/litestream ./cmd/litestream
|
|
||||||
cd dist
|
|
||||||
tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz litestream
|
|
||||||
|
|
||||||
- name: Upload release tarball
|
|
||||||
uses: actions/upload-release-asset@v1.0.2
|
|
||||||
env:
|
|
||||||
GITHUB_TOKEN: ${{ github.token }}
|
|
||||||
with:
|
|
||||||
upload_url: ${{ steps.release.outputs.upload_url }}
|
|
||||||
asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz
|
|
||||||
asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz
|
|
||||||
asset_content_type: application/gzip
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM golang:1.20.1 as builder
|
FROM golang:1.21.3 as builder
|
||||||
|
|
||||||
WORKDIR /src/litestream
|
WORKDIR /src/litestream
|
||||||
COPY . .
|
COPY . .
|
||||||
|
|||||||
5
Makefile
5
Makefile
@@ -25,6 +25,11 @@ endif
|
|||||||
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
|
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
|
||||||
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
|
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-amd64.zip
|
||||||
|
|
||||||
|
GOOS=darwin GOARCH=arm64 CC="gcc -target arm64-apple-macos11" CGO_ENABLED=1 go build -v -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}'" -o dist/litestream ./cmd/litestream
|
||||||
|
gon etc/gon.hcl
|
||||||
|
mv dist/litestream.zip dist/litestream-${LITESTREAM_VERSION}-darwin-arm64.zip
|
||||||
|
openssl dgst -sha256 dist/litestream-${LITESTREAM_VERSION}-darwin-arm64.zip
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -rf dist
|
rm -rf dist
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
@@ -87,7 +86,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
|||||||
for _, r := range replicas {
|
for _, r := range replicas {
|
||||||
generations, err := r.Client.Generations(ctx)
|
generations, err := r.Client.Generations(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("%s: cannot list generations: %s", r.Name(), err)
|
r.Logger().Error("cannot list generations", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -95,7 +94,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
|
|||||||
for _, generation := range generations {
|
for _, generation := range generations {
|
||||||
createdAt, updatedAt, err := r.GenerationTimeBounds(ctx, generation)
|
createdAt, updatedAt, err := r.GenerationTimeBounds(ctx, generation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("%s: cannot determine generation time bounds: %s", r.Name(), err)
|
r.Logger().Error("cannot determine generation time bounds", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log/slog"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/user"
|
"os/user"
|
||||||
@@ -37,13 +37,11 @@ var (
|
|||||||
var errStop = errors.New("stop")
|
var errStop = errors.New("stop")
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.SetFlags(0)
|
|
||||||
|
|
||||||
m := NewMain()
|
m := NewMain()
|
||||||
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
|
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
log.Println(err)
|
slog.Error("failed to run", "error", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -95,17 +93,17 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
|
|||||||
// Wait for signal to stop program.
|
// Wait for signal to stop program.
|
||||||
select {
|
select {
|
||||||
case err = <-c.execCh:
|
case err = <-c.execCh:
|
||||||
fmt.Println("subprocess exited, litestream shutting down")
|
slog.Info("subprocess exited, litestream shutting down")
|
||||||
case sig := <-signalCh:
|
case sig := <-signalCh:
|
||||||
fmt.Println("signal received, litestream shutting down")
|
slog.Info("signal received, litestream shutting down")
|
||||||
|
|
||||||
if c.cmd != nil {
|
if c.cmd != nil {
|
||||||
fmt.Println("sending signal to exec process")
|
slog.Info("sending signal to exec process")
|
||||||
if err := c.cmd.Process.Signal(sig); err != nil {
|
if err := c.cmd.Process.Signal(sig); err != nil {
|
||||||
return fmt.Errorf("cannot signal exec process: %w", err)
|
return fmt.Errorf("cannot signal exec process: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("waiting for exec process to close")
|
slog.Info("waiting for exec process to close")
|
||||||
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
|
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
|
||||||
return fmt.Errorf("cannot wait for exec process: %w", err)
|
return fmt.Errorf("cannot wait for exec process: %w", err)
|
||||||
}
|
}
|
||||||
@@ -116,7 +114,7 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
|
|||||||
if e := c.Close(); e != nil && err == nil {
|
if e := c.Close(); e != nil && err == nil {
|
||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
fmt.Println("litestream shut down")
|
slog.Info("litestream shut down")
|
||||||
return err
|
return err
|
||||||
|
|
||||||
case "restore":
|
case "restore":
|
||||||
@@ -172,6 +170,16 @@ type Config struct {
|
|||||||
// Global S3 settings
|
// Global S3 settings
|
||||||
AccessKeyID string `yaml:"access-key-id"`
|
AccessKeyID string `yaml:"access-key-id"`
|
||||||
SecretAccessKey string `yaml:"secret-access-key"`
|
SecretAccessKey string `yaml:"secret-access-key"`
|
||||||
|
|
||||||
|
// Logging
|
||||||
|
Logging LoggingConfig `yaml:"logging"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoggingConfig configures logging.
|
||||||
|
type LoggingConfig struct {
|
||||||
|
Level string `yaml:"level"`
|
||||||
|
Type string `yaml:"type"`
|
||||||
|
Stderr bool `yaml:"stderr"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// propagateGlobalSettings copies global S3 settings to replica configs.
|
// propagateGlobalSettings copies global S3 settings to replica configs.
|
||||||
@@ -241,6 +249,36 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
|
|||||||
// Propage settings from global config to replica configs.
|
// Propage settings from global config to replica configs.
|
||||||
config.propagateGlobalSettings()
|
config.propagateGlobalSettings()
|
||||||
|
|
||||||
|
// Configure logging.
|
||||||
|
logOutput := os.Stdout
|
||||||
|
if config.Logging.Stderr {
|
||||||
|
logOutput = os.Stderr
|
||||||
|
}
|
||||||
|
|
||||||
|
logOptions := slog.HandlerOptions{
|
||||||
|
Level: slog.LevelInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
switch strings.ToUpper(config.Logging.Level) {
|
||||||
|
case "DEBUG":
|
||||||
|
logOptions.Level = slog.LevelDebug
|
||||||
|
case "WARN", "WARNING":
|
||||||
|
logOptions.Level = slog.LevelWarn
|
||||||
|
case "ERROR":
|
||||||
|
logOptions.Level = slog.LevelError
|
||||||
|
}
|
||||||
|
|
||||||
|
var logHandler slog.Handler
|
||||||
|
switch config.Logging.Type {
|
||||||
|
case "json":
|
||||||
|
logHandler = slog.NewJSONHandler(logOutput, &logOptions)
|
||||||
|
case "text", "":
|
||||||
|
logHandler = slog.NewTextHandler(logOutput, &logOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set global default logger.
|
||||||
|
slog.SetDefault(slog.New(logHandler))
|
||||||
|
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// +build !windows
|
//go:build !windows
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
// +build windows
|
//go:build windows
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
|
||||||
@@ -63,13 +62,13 @@ func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, stat
|
|||||||
// Instantiate replication command and load configuration.
|
// Instantiate replication command and load configuration.
|
||||||
c := NewReplicateCommand()
|
c := NewReplicateCommand()
|
||||||
if c.Config, err = ReadConfigFile(DefaultConfigPath(), true); err != nil {
|
if c.Config, err = ReadConfigFile(DefaultConfigPath(), true); err != nil {
|
||||||
log.Printf("cannot load configuration: %s", err)
|
slog.Error("cannot load configuration", "error", err)
|
||||||
return true, 1
|
return true, 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute replication command.
|
// Execute replication command.
|
||||||
if err := c.Run(s.ctx); err != nil {
|
if err := c.Run(s.ctx); err != nil {
|
||||||
log.Printf("cannot replicate: %s", err)
|
slog.Error("cannot replicate", "error", err)
|
||||||
statusCh <- svc.Status{State: svc.StopPending}
|
statusCh <- svc.Status{State: svc.StopPending}
|
||||||
return true, 2
|
return true, 2
|
||||||
}
|
}
|
||||||
@@ -88,7 +87,7 @@ func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, stat
|
|||||||
case svc.Interrogate:
|
case svc.Interrogate:
|
||||||
statusCh <- req.CurrentStatus
|
statusCh <- req.CurrentStatus
|
||||||
default:
|
default:
|
||||||
log.Printf("Litestream service received unexpected change request cmd: %d", req.Cmd)
|
slog.Error("Litestream service received unexpected change request", "cmd", req.Cmd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
@@ -42,7 +42,6 @@ func NewReplicateCommand() *ReplicateCommand {
|
|||||||
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
|
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
|
||||||
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
|
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
|
||||||
execFlag := fs.String("exec", "", "execute subcommand")
|
execFlag := fs.String("exec", "", "execute subcommand")
|
||||||
tracePath := fs.String("trace", "", "trace path")
|
|
||||||
configPath, noExpandEnv := registerConfigFlag(fs)
|
configPath, noExpandEnv := registerConfigFlag(fs)
|
||||||
fs.Usage = c.Usage
|
fs.Usage = c.Usage
|
||||||
if err := fs.Parse(args); err != nil {
|
if err := fs.Parse(args); err != nil {
|
||||||
@@ -80,27 +79,17 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
|
|||||||
c.Config.Exec = *execFlag
|
c.Config.Exec = *execFlag
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enable trace logging.
|
|
||||||
if *tracePath != "" {
|
|
||||||
f, err := os.Create(*tracePath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
litestream.Tracef = log.New(f, "", log.LstdFlags|log.Lmicroseconds|log.LUTC|log.Lshortfile).Printf
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run loads all databases specified in the configuration.
|
// Run loads all databases specified in the configuration.
|
||||||
func (c *ReplicateCommand) Run() (err error) {
|
func (c *ReplicateCommand) Run() (err error) {
|
||||||
// Display version information.
|
// Display version information.
|
||||||
log.Printf("litestream %s", Version)
|
slog.Info("litestream", "version", Version)
|
||||||
|
|
||||||
// Setup databases.
|
// Setup databases.
|
||||||
if len(c.Config.DBs) == 0 {
|
if len(c.Config.DBs) == 0 {
|
||||||
log.Println("no databases specified in configuration")
|
slog.Error("no databases specified in configuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dbConfig := range c.Config.DBs {
|
for _, dbConfig := range c.Config.DBs {
|
||||||
@@ -118,21 +107,22 @@ func (c *ReplicateCommand) Run() (err error) {
|
|||||||
|
|
||||||
// Notify user that initialization is done.
|
// Notify user that initialization is done.
|
||||||
for _, db := range c.DBs {
|
for _, db := range c.DBs {
|
||||||
log.Printf("initialized db: %s", db.Path())
|
slog.Info("initialized db", "path", db.Path())
|
||||||
for _, r := range db.Replicas {
|
for _, r := range db.Replicas {
|
||||||
|
slog := slog.With("name", r.Name(), "type", r.Client.Type(), "sync-interval", r.SyncInterval)
|
||||||
switch client := r.Client.(type) {
|
switch client := r.Client.(type) {
|
||||||
case *file.ReplicaClient:
|
case *file.ReplicaClient:
|
||||||
log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), client.Type(), client.Path())
|
slog.Info("replicating to", "path", client.Path())
|
||||||
case *s3.ReplicaClient:
|
case *s3.ReplicaClient:
|
||||||
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Region, client.Endpoint, r.SyncInterval)
|
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region, "endpoint", client.Endpoint)
|
||||||
case *gcs.ReplicaClient:
|
case *gcs.ReplicaClient:
|
||||||
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, r.SyncInterval)
|
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path)
|
||||||
case *abs.ReplicaClient:
|
case *abs.ReplicaClient:
|
||||||
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Endpoint, r.SyncInterval)
|
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint)
|
||||||
case *sftp.ReplicaClient:
|
case *sftp.ReplicaClient:
|
||||||
log.Printf("replicating to: name=%q type=%q host=%q user=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Host, client.User, client.Path, r.SyncInterval)
|
slog.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
|
||||||
default:
|
default:
|
||||||
log.Printf("replicating to: name=%q type=%q", r.Name(), client.Type())
|
slog.Info("replicating to")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -146,11 +136,11 @@ func (c *ReplicateCommand) Run() (err error) {
|
|||||||
hostport = net.JoinHostPort("localhost", port)
|
hostport = net.JoinHostPort("localhost", port)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("serving metrics on http://%s/metrics", hostport)
|
slog.Info("serving metrics on", "url", fmt.Sprintf("http://%s/metrics", hostport))
|
||||||
go func() {
|
go func() {
|
||||||
http.Handle("/metrics", promhttp.Handler())
|
http.Handle("/metrics", promhttp.Handler())
|
||||||
if err := http.ListenAndServe(c.Config.Addr, nil); err != nil {
|
if err := http.ListenAndServe(c.Config.Addr, nil); err != nil {
|
||||||
log.Printf("cannot start metrics server: %s", err)
|
slog.Error("cannot start metrics server", "error", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -179,7 +169,7 @@ func (c *ReplicateCommand) Run() (err error) {
|
|||||||
func (c *ReplicateCommand) Close() (err error) {
|
func (c *ReplicateCommand) Close() (err error) {
|
||||||
for _, db := range c.DBs {
|
for _, db := range c.DBs {
|
||||||
if e := db.Close(); e != nil {
|
if e := db.Close(); e != nil {
|
||||||
log.Printf("error closing db: path=%s err=%s", db.Path(), e)
|
db.Logger.Error("error closing db", "error", e)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
@@ -215,8 +205,5 @@ Arguments:
|
|||||||
-no-expand-env
|
-no-expand-env
|
||||||
Disables environment variable expansion in configuration file.
|
Disables environment variable expansion in configuration file.
|
||||||
|
|
||||||
-trace PATH
|
|
||||||
Write verbose trace logging to PATH.
|
|
||||||
|
|
||||||
`[1:], DefaultConfigPath())
|
`[1:], DefaultConfigPath())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
@@ -19,7 +19,6 @@ type RestoreCommand struct{}
|
|||||||
// Run executes the command.
|
// Run executes the command.
|
||||||
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
opt := litestream.NewRestoreOptions()
|
opt := litestream.NewRestoreOptions()
|
||||||
opt.Verbose = true
|
|
||||||
|
|
||||||
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
|
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
|
||||||
configPath, noExpandEnv := registerConfigFlag(fs)
|
configPath, noExpandEnv := registerConfigFlag(fs)
|
||||||
@@ -31,7 +30,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
ifDBNotExists := fs.Bool("if-db-not-exists", false, "")
|
ifDBNotExists := fs.Bool("if-db-not-exists", false, "")
|
||||||
ifReplicaExists := fs.Bool("if-replica-exists", false, "")
|
ifReplicaExists := fs.Bool("if-replica-exists", false, "")
|
||||||
timestampStr := fs.String("timestamp", "", "timestamp")
|
timestampStr := fs.String("timestamp", "", "timestamp")
|
||||||
verbose := fs.Bool("v", false, "verbose output")
|
|
||||||
fs.Usage = c.Usage
|
fs.Usage = c.Usage
|
||||||
if err := fs.Parse(args); err != nil {
|
if err := fs.Parse(args); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -48,11 +46,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Instantiate logger if verbose output is enabled.
|
|
||||||
if *verbose {
|
|
||||||
opt.Logger = log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine replica & generation to restore from.
|
// Determine replica & generation to restore from.
|
||||||
var r *litestream.Replica
|
var r *litestream.Replica
|
||||||
if isURL(fs.Arg(0)) {
|
if isURL(fs.Arg(0)) {
|
||||||
@@ -60,7 +53,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
return fmt.Errorf("cannot specify a replica URL and the -config flag")
|
return fmt.Errorf("cannot specify a replica URL and the -config flag")
|
||||||
}
|
}
|
||||||
if r, err = c.loadFromURL(ctx, fs.Arg(0), *ifDBNotExists, &opt); err == errSkipDBExists {
|
if r, err = c.loadFromURL(ctx, fs.Arg(0), *ifDBNotExists, &opt); err == errSkipDBExists {
|
||||||
fmt.Println("database already exists, skipping")
|
slog.Info("database already exists, skipping")
|
||||||
return nil
|
return nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -70,7 +63,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
*configPath = DefaultConfigPath()
|
*configPath = DefaultConfigPath()
|
||||||
}
|
}
|
||||||
if r, err = c.loadFromConfig(ctx, fs.Arg(0), *configPath, !*noExpandEnv, *ifDBNotExists, &opt); err == errSkipDBExists {
|
if r, err = c.loadFromConfig(ctx, fs.Arg(0), *configPath, !*noExpandEnv, *ifDBNotExists, &opt); err == errSkipDBExists {
|
||||||
fmt.Println("database already exists, skipping")
|
slog.Info("database already exists, skipping")
|
||||||
return nil
|
return nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -81,7 +74,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
// If optional flag set, return success. Useful for automated recovery.
|
// If optional flag set, return success. Useful for automated recovery.
|
||||||
if opt.Generation == "" {
|
if opt.Generation == "" {
|
||||||
if *ifReplicaExists {
|
if *ifReplicaExists {
|
||||||
fmt.Println("no matching backups found")
|
slog.Info("no matching backups found")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("no matching backups found")
|
return fmt.Errorf("no matching backups found")
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
@@ -82,7 +82,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
for _, r := range replicas {
|
for _, r := range replicas {
|
||||||
infos, err := r.Snapshots(ctx)
|
infos, err := r.Snapshots(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("cannot determine snapshots: %s", err)
|
slog.Error("cannot determine snapshots", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, info := range infos {
|
for _, info := range infos {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
@@ -86,7 +85,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
generations = []string{*generation}
|
generations = []string{*generation}
|
||||||
} else {
|
} else {
|
||||||
if generations, err = r.Client.Generations(ctx); err != nil {
|
if generations, err = r.Client.Generations(ctx); err != nil {
|
||||||
log.Printf("%s: cannot determine generations: %s", r.Name(), err)
|
r.Logger().Error("cannot determine generations", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -113,7 +112,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) {
|
|||||||
}
|
}
|
||||||
return itr.Close()
|
return itr.Close()
|
||||||
}(); err != nil {
|
}(); err != nil {
|
||||||
log.Printf("%s: cannot fetch wal segments: %s", r.Name(), err)
|
r.Logger().Error("cannot fetch wal segments", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
161
db.go
161
db.go
@@ -11,7 +11,7 @@ import (
|
|||||||
"hash/crc64"
|
"hash/crc64"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log/slog"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
@@ -31,6 +31,7 @@ const (
|
|||||||
DefaultCheckpointInterval = 1 * time.Minute
|
DefaultCheckpointInterval = 1 * time.Minute
|
||||||
DefaultMinCheckpointPageN = 1000
|
DefaultMinCheckpointPageN = 1000
|
||||||
DefaultMaxCheckpointPageN = 10000
|
DefaultMaxCheckpointPageN = 10000
|
||||||
|
DefaultTruncatePageN = 500000
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxIndex is the maximum possible WAL index.
|
// MaxIndex is the maximum possible WAL index.
|
||||||
@@ -85,6 +86,16 @@ type DB struct {
|
|||||||
// unbounded if there are always read transactions occurring.
|
// unbounded if there are always read transactions occurring.
|
||||||
MaxCheckpointPageN int
|
MaxCheckpointPageN int
|
||||||
|
|
||||||
|
// Threshold of WAL size, in pages, before a forced truncation checkpoint.
|
||||||
|
// A forced truncation checkpoint will block new transactions and wait for
|
||||||
|
// existing transactions to finish before issuing a checkpoint and
|
||||||
|
// truncating the WAL.
|
||||||
|
//
|
||||||
|
// If zero, no truncates are forced. This can cause the WAL to grow
|
||||||
|
// unbounded if there's a sudden spike of changes between other
|
||||||
|
// checkpoints.
|
||||||
|
TruncatePageN int
|
||||||
|
|
||||||
// Time between automatic checkpoints in the WAL. This is done to allow
|
// Time between automatic checkpoints in the WAL. This is done to allow
|
||||||
// more fine-grained WAL files so that restores can be performed with
|
// more fine-grained WAL files so that restores can be performed with
|
||||||
// better precision.
|
// better precision.
|
||||||
@@ -97,8 +108,8 @@ type DB struct {
|
|||||||
// Must be set before calling Open().
|
// Must be set before calling Open().
|
||||||
Replicas []*Replica
|
Replicas []*Replica
|
||||||
|
|
||||||
// Where to send log messages, defaults to log.Default()
|
// Where to send log messages, defaults to global slog with databas epath.
|
||||||
Logger *log.Logger
|
Logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDB returns a new instance of DB for a given path.
|
// NewDB returns a new instance of DB for a given path.
|
||||||
@@ -112,10 +123,10 @@ func NewDB(path string) *DB {
|
|||||||
|
|
||||||
MinCheckpointPageN: DefaultMinCheckpointPageN,
|
MinCheckpointPageN: DefaultMinCheckpointPageN,
|
||||||
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
|
||||||
|
TruncatePageN: DefaultTruncatePageN,
|
||||||
CheckpointInterval: DefaultCheckpointInterval,
|
CheckpointInterval: DefaultCheckpointInterval,
|
||||||
MonitorInterval: DefaultMonitorInterval,
|
MonitorInterval: DefaultMonitorInterval,
|
||||||
|
Logger: slog.With("db", path),
|
||||||
Logger: log.Default(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
|
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
|
||||||
@@ -461,7 +472,7 @@ func (db *DB) init() (err error) {
|
|||||||
|
|
||||||
// If we have an existing shadow WAL, ensure the headers match.
|
// If we have an existing shadow WAL, ensure the headers match.
|
||||||
if err := db.verifyHeadersMatch(); err != nil {
|
if err := db.verifyHeadersMatch(); err != nil {
|
||||||
db.Logger.Printf("%s: init: cannot determine last wal position, clearing generation; %s", db.path, err)
|
db.Logger.Warn("init: cannot determine last wal position, clearing generation", "error", err)
|
||||||
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("remove generation name: %w", err)
|
return fmt.Errorf("remove generation name: %w", err)
|
||||||
}
|
}
|
||||||
@@ -703,7 +714,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
if err := db.init(); err != nil {
|
if err := db.init(); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if db.db == nil {
|
} else if db.db == nil {
|
||||||
Tracef("%s: sync: no database found", db.path)
|
db.Logger.Debug("sync: no database found")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -729,7 +740,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot verify wal state: %w", err)
|
return fmt.Errorf("cannot verify wal state: %w", err)
|
||||||
}
|
}
|
||||||
Tracef("%s: sync: info=%#v", db.path, info)
|
db.Logger.Debug("sync", "info", &info)
|
||||||
|
|
||||||
// Track if anything in the shadow WAL changes and then notify at the end.
|
// Track if anything in the shadow WAL changes and then notify at the end.
|
||||||
changed := info.walSize != info.shadowWALSize || info.restart || info.reason != ""
|
changed := info.walSize != info.shadowWALSize || info.restart || info.reason != ""
|
||||||
@@ -740,7 +751,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
if info.generation, err = db.createGeneration(); err != nil {
|
if info.generation, err = db.createGeneration(); err != nil {
|
||||||
return fmt.Errorf("create generation: %w", err)
|
return fmt.Errorf("create generation: %w", err)
|
||||||
}
|
}
|
||||||
db.Logger.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason)
|
db.Logger.Info("sync: new generation", "generation", info.generation, "reason", info.reason)
|
||||||
|
|
||||||
// Clear shadow wal info.
|
// Clear shadow wal info.
|
||||||
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
|
info.shadowWALPath = db.ShadowWALPath(info.generation, 0)
|
||||||
@@ -751,7 +762,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Synchronize real WAL with current shadow WAL.
|
// Synchronize real WAL with current shadow WAL.
|
||||||
newWALSize, err := db.syncWAL(info)
|
origWALSize, newWALSize, err := db.syncWAL(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sync wal: %w", err)
|
return fmt.Errorf("sync wal: %w", err)
|
||||||
}
|
}
|
||||||
@@ -760,7 +771,9 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
// If WAL size is greater than min threshold, attempt checkpoint.
|
// If WAL size is greater than min threshold, attempt checkpoint.
|
||||||
var checkpoint bool
|
var checkpoint bool
|
||||||
checkpointMode := CheckpointModePassive
|
checkpointMode := CheckpointModePassive
|
||||||
if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
|
if db.TruncatePageN > 0 && origWALSize >= calcWALSize(db.pageSize, db.TruncatePageN) {
|
||||||
|
checkpoint, checkpointMode = true, CheckpointModeTruncate
|
||||||
|
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
|
||||||
checkpoint, checkpointMode = true, CheckpointModeRestart
|
checkpoint, checkpointMode = true, CheckpointModeRestart
|
||||||
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
|
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
|
||||||
checkpoint = true
|
checkpoint = true
|
||||||
@@ -794,7 +807,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
|||||||
db.notify = make(chan struct{})
|
db.notify = make(chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
Tracef("%s: sync: ok", db.path)
|
db.Logger.Debug("sync: ok")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -919,29 +932,29 @@ type syncInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
|
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
|
||||||
func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) {
|
func (db *DB) syncWAL(info syncInfo) (origSize int64, newSize int64, err error) {
|
||||||
// Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed.
|
// Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed.
|
||||||
newSize, err = db.copyToShadowWAL(info.shadowWALPath)
|
origSize, newSize, err = db.copyToShadowWAL(info.shadowWALPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return newSize, fmt.Errorf("cannot copy to shadow wal: %w", err)
|
return origSize, newSize, fmt.Errorf("cannot copy to shadow wal: %w", err)
|
||||||
} else if !info.restart {
|
} else if !info.restart {
|
||||||
return newSize, nil // If no restart required, exit.
|
return origSize, newSize, nil // If no restart required, exit.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse index of current shadow WAL file.
|
// Parse index of current shadow WAL file.
|
||||||
dir, base := filepath.Split(info.shadowWALPath)
|
dir, base := filepath.Split(info.shadowWALPath)
|
||||||
index, err := ParseWALPath(base)
|
index, err := ParseWALPath(base)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
return 0, 0, fmt.Errorf("cannot parse shadow wal filename: %s", base)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a new shadow WAL file with next index.
|
// Start a new shadow WAL file with next index.
|
||||||
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
|
newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1))
|
||||||
newSize, err = db.initShadowWALFile(newShadowWALPath)
|
newSize, err = db.initShadowWALFile(newShadowWALPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
return 0, 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
|
||||||
}
|
}
|
||||||
return newSize, nil
|
return origSize, newSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
||||||
@@ -977,80 +990,96 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
|
|||||||
_ = os.Chown(filename, uid, gid)
|
_ = os.Chown(filename, uid, gid)
|
||||||
|
|
||||||
// Copy as much shadow WAL as available.
|
// Copy as much shadow WAL as available.
|
||||||
newSize, err := db.copyToShadowWAL(filename)
|
_, newSize, err := db.copyToShadowWAL(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err)
|
return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err)
|
||||||
}
|
}
|
||||||
return newSize, nil
|
return newSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64, err error) {
|
||||||
Tracef("%s: copy-shadow: %s", db.path, filename)
|
logger := db.Logger.With("filename", filename)
|
||||||
|
logger.Debug("copy-shadow")
|
||||||
|
|
||||||
r, err := os.Open(db.WALPath())
|
r, err := os.Open(db.WALPath())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
|
fi, err := r.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
origWalSize = frameAlign(fi.Size(), db.pageSize)
|
||||||
|
|
||||||
w, err := os.OpenFile(filename, os.O_RDWR, 0666)
|
w, err := os.OpenFile(filename, os.O_RDWR, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
fi, err := w.Stat()
|
fi, err = w.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
origSize := frameAlign(fi.Size(), db.pageSize)
|
origSize := frameAlign(fi.Size(), db.pageSize)
|
||||||
|
|
||||||
// Read shadow WAL header to determine byte order for checksum & salt.
|
// Read shadow WAL header to determine byte order for checksum & salt.
|
||||||
hdr := make([]byte, WALHeaderSize)
|
hdr := make([]byte, WALHeaderSize)
|
||||||
if _, err := io.ReadFull(w, hdr); err != nil {
|
if _, err := io.ReadFull(w, hdr); err != nil {
|
||||||
return 0, fmt.Errorf("read header: %w", err)
|
return 0, 0, fmt.Errorf("read header: %w", err)
|
||||||
}
|
}
|
||||||
hsalt0 := binary.BigEndian.Uint32(hdr[16:])
|
hsalt0 := binary.BigEndian.Uint32(hdr[16:])
|
||||||
hsalt1 := binary.BigEndian.Uint32(hdr[20:])
|
hsalt1 := binary.BigEndian.Uint32(hdr[20:])
|
||||||
|
|
||||||
bo, err := headerByteOrder(hdr)
|
bo, err := headerByteOrder(hdr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read previous checksum.
|
// Read previous checksum.
|
||||||
chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize)
|
chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("last checksum: %w", err)
|
return 0, 0, fmt.Errorf("last checksum: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write to a temporary shadow file.
|
||||||
|
tempFilename := filename + ".tmp"
|
||||||
|
defer os.Remove(tempFilename)
|
||||||
|
|
||||||
|
f, err := internal.CreateFile(tempFilename, db.fileInfo)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("create temp file: %w", err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
// Seek to correct position on real wal.
|
// Seek to correct position on real wal.
|
||||||
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
|
||||||
return 0, fmt.Errorf("real wal seek: %w", err)
|
return 0, 0, fmt.Errorf("real wal seek: %w", err)
|
||||||
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
} else if _, err := w.Seek(origSize, io.SeekStart); err != nil {
|
||||||
return 0, fmt.Errorf("shadow wal seek: %w", err)
|
return 0, 0, fmt.Errorf("shadow wal seek: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read through WAL from last position to find the page of the last
|
// Read through WAL from last position to find the page of the last
|
||||||
// committed transaction.
|
// committed transaction.
|
||||||
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
|
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
|
||||||
var buf bytes.Buffer
|
|
||||||
offset := origSize
|
offset := origSize
|
||||||
lastCommitSize := origSize
|
lastCommitSize := origSize
|
||||||
for {
|
for {
|
||||||
// Read next page from WAL file.
|
// Read next page from WAL file.
|
||||||
if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
|
if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err)
|
logger.Debug("copy-shadow: break", "offset", offset, "error", err)
|
||||||
break // end of file or partial page
|
break // end of file or partial page
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return 0, fmt.Errorf("read wal: %w", err)
|
return 0, 0, fmt.Errorf("read wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
// Read frame salt & compare to header salt. Stop reading on mismatch.
|
||||||
salt0 := binary.BigEndian.Uint32(frame[8:])
|
salt0 := binary.BigEndian.Uint32(frame[8:])
|
||||||
salt1 := binary.BigEndian.Uint32(frame[12:])
|
salt1 := binary.BigEndian.Uint32(frame[12:])
|
||||||
if salt0 != hsalt0 || salt1 != hsalt1 {
|
if salt0 != hsalt0 || salt1 != hsalt1 {
|
||||||
Tracef("%s: copy-shadow: break: salt mismatch", db.path)
|
logger.Debug("copy-shadow: break: salt mismatch")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1060,38 +1089,60 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
|
|||||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
|
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header
|
||||||
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
|
chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data
|
||||||
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
if chksum0 != fchksum0 || chksum1 != fchksum1 {
|
||||||
Tracef("%s: copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", db.path, offset, chksum0, chksum1, fchksum0, fchksum1)
|
logger.Debug("copy shadow: checksum mismatch, skipping", "offset", offset, "check", fmt.Sprintf("(%x,%x) != (%x,%x)", chksum0, chksum1, fchksum0, fchksum1))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add page to the new size of the shadow WAL.
|
// Write page to temporary WAL file.
|
||||||
buf.Write(frame)
|
if _, err := f.Write(frame); err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("write temp shadow wal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
|
logger.Debug("copy-shadow: ok", "offset", offset, "salt", fmt.Sprintf("%x %x", salt0, salt1))
|
||||||
offset += int64(len(frame))
|
offset += int64(len(frame))
|
||||||
|
|
||||||
// Flush to shadow WAL if commit record.
|
// Update new size if written frame was a commit record.
|
||||||
newDBSize := binary.BigEndian.Uint32(frame[4:])
|
newDBSize := binary.BigEndian.Uint32(frame[4:])
|
||||||
if newDBSize != 0 {
|
if newDBSize != 0 {
|
||||||
if _, err := buf.WriteTo(w); err != nil {
|
|
||||||
return 0, fmt.Errorf("write shadow wal: %w", err)
|
|
||||||
}
|
|
||||||
buf.Reset()
|
|
||||||
lastCommitSize = offset
|
lastCommitSize = offset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync & close.
|
// If no WAL writes found, exit.
|
||||||
|
if origSize == lastCommitSize {
|
||||||
|
return origSize, lastCommitSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
walByteN := lastCommitSize - origSize
|
||||||
|
|
||||||
|
// Move to beginning of temporary file.
|
||||||
|
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("temp file seek: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy from temporary file to shadow WAL.
|
||||||
|
if _, err := io.Copy(w, &io.LimitedReader{R: f, N: walByteN}); err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("write shadow file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close & remove temporary file.
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
} else if err := os.Remove(tempFilename); err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync & close shadow WAL.
|
||||||
if err := w.Sync(); err != nil {
|
if err := w.Sync(); err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
} else if err := w.Close(); err != nil {
|
} else if err := w.Close(); err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track total number of bytes written to WAL.
|
// Track total number of bytes written to WAL.
|
||||||
db.totalWALBytesCounter.Add(float64(lastCommitSize - origSize))
|
db.totalWALBytesCounter.Add(float64(walByteN))
|
||||||
|
|
||||||
return lastCommitSize, nil
|
return origWalSize, lastCommitSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
|
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
|
||||||
@@ -1266,7 +1317,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy shadow WAL before checkpoint to copy as much as possible.
|
// Copy shadow WAL before checkpoint to copy as much as possible.
|
||||||
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
||||||
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
|
return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1301,7 +1352,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy the end of the previous WAL before starting a new shadow WAL.
|
// Copy the end of the previous WAL before starting a new shadow WAL.
|
||||||
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
|
||||||
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
|
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1360,7 +1411,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {
|
|||||||
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
|
if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2])
|
db.Logger.Debug("checkpoint", "mode", mode, "result", fmt.Sprintf("%d,%d,%d", row[0], row[1], row[2]))
|
||||||
|
|
||||||
// Reacquire the read lock immediately after the checkpoint.
|
// Reacquire the read lock immediately after the checkpoint.
|
||||||
if err := db.acquireReadLock(); err != nil {
|
if err := db.acquireReadLock(); err != nil {
|
||||||
@@ -1385,7 +1436,7 @@ func (db *DB) monitor() {
|
|||||||
|
|
||||||
// Sync the database to the shadow WAL.
|
// Sync the database to the shadow WAL.
|
||||||
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
|
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
|
||||||
db.Logger.Printf("%s: sync error: %s", db.path, err)
|
db.Logger.Error("sync error", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1529,10 +1580,6 @@ type RestoreOptions struct {
|
|||||||
|
|
||||||
// Specifies how many WAL files are downloaded in parallel during restore.
|
// Specifies how many WAL files are downloaded in parallel during restore.
|
||||||
Parallelism int
|
Parallelism int
|
||||||
|
|
||||||
// Logging settings.
|
|
||||||
Logger *log.Logger
|
|
||||||
Verbose bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
|
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -1,6 +1,6 @@
|
|||||||
module github.com/benbjohnson/litestream
|
module github.com/benbjohnson/litestream
|
||||||
|
|
||||||
go 1.19
|
go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
cloud.google.com/go/storage v1.31.0
|
cloud.google.com/go/storage v1.31.0
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -87,6 +87,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
|
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
|
||||||
|
github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
|
||||||
github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc=
|
github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc=
|
||||||
github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
|
github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
|
||||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
@@ -106,6 +107,7 @@ github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
|
|||||||
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
||||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||||
|
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
@@ -137,6 +139,7 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
|
|||||||
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
|
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
|
||||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||||
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
||||||
|
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||||
@@ -212,6 +215,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
|
|||||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||||
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||||
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
|
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
|
||||||
|
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris
|
||||||
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris
|
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris
|
||||||
|
|
||||||
package internal
|
package internal
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
//go:build windows
|
||||||
// +build windows
|
// +build windows
|
||||||
|
|
||||||
package internal
|
package internal
|
||||||
|
|||||||
@@ -540,9 +540,6 @@ func isHexChar(ch rune) bool {
|
|||||||
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
|
return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tracef is used for low-level tracing.
|
|
||||||
var Tracef = func(format string, a ...interface{}) {}
|
|
||||||
|
|
||||||
func assert(condition bool, message string) {
|
func assert(condition bool, message string) {
|
||||||
if !condition {
|
if !condition {
|
||||||
panic("assertion failed: " + message)
|
panic("assertion failed: " + message)
|
||||||
|
|||||||
105
replica.go
105
replica.go
@@ -7,7 +7,7 @@ import (
|
|||||||
"hash/crc64"
|
"hash/crc64"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log/slog"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -72,9 +72,6 @@ type Replica struct {
|
|||||||
// Encryption identities and recipients
|
// Encryption identities and recipients
|
||||||
AgeIdentities []age.Identity
|
AgeIdentities []age.Identity
|
||||||
AgeRecipients []age.Recipient
|
AgeRecipients []age.Recipient
|
||||||
|
|
||||||
// The logger to send logging messages to. Defaults to log.Default()
|
|
||||||
Logger *log.Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReplica(db *DB, name string) *Replica {
|
func NewReplica(db *DB, name string) *Replica {
|
||||||
@@ -87,7 +84,6 @@ func NewReplica(db *DB, name string) *Replica {
|
|||||||
Retention: DefaultRetention,
|
Retention: DefaultRetention,
|
||||||
RetentionCheckInterval: DefaultRetentionCheckInterval,
|
RetentionCheckInterval: DefaultRetentionCheckInterval,
|
||||||
MonitorEnabled: true,
|
MonitorEnabled: true,
|
||||||
Logger: log.Default(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return r
|
return r
|
||||||
@@ -101,6 +97,11 @@ func (r *Replica) Name() string {
|
|||||||
return r.name
|
return r.name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Logger returns the DB sub-logger for this replica.
|
||||||
|
func (r *Replica) Logger() *slog.Logger {
|
||||||
|
return r.db.Logger.With("replica", r.Name())
|
||||||
|
}
|
||||||
|
|
||||||
// DB returns a reference to the database the replica is attached to, if any.
|
// DB returns a reference to the database the replica is attached to, if any.
|
||||||
func (r *Replica) DB() *DB { return r.db }
|
func (r *Replica) DB() *DB { return r.db }
|
||||||
|
|
||||||
@@ -166,7 +167,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
generation := dpos.Generation
|
generation := dpos.Generation
|
||||||
|
|
||||||
Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos)
|
r.Logger().Debug("replica sync", "position", dpos.String())
|
||||||
|
|
||||||
// Create a new snapshot and update the current replica position if
|
// Create a new snapshot and update the current replica position if
|
||||||
// the generation on the database has changed.
|
// the generation on the database has changed.
|
||||||
@@ -188,7 +189,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
|
|||||||
return fmt.Errorf("cannot determine replica position: %s", err)
|
return fmt.Errorf("cannot determine replica position: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos)
|
r.Logger().Debug("replica sync: calc new pos", "position", pos.String())
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
r.pos = pos
|
r.pos = pos
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
@@ -222,6 +223,12 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
|||||||
// Obtain initial position from shadow reader.
|
// Obtain initial position from shadow reader.
|
||||||
// It may have moved to the next index if previous position was at the end.
|
// It may have moved to the next index if previous position was at the end.
|
||||||
pos := rd.Pos()
|
pos := rd.Pos()
|
||||||
|
initialPos := pos
|
||||||
|
startTime := time.Now()
|
||||||
|
var bytesWritten int
|
||||||
|
|
||||||
|
logger := r.Logger()
|
||||||
|
logger.Info("write wal segment", "position", initialPos.String())
|
||||||
|
|
||||||
// Copy through pipe into client from the starting position.
|
// Copy through pipe into client from the starting position.
|
||||||
var g errgroup.Group
|
var g errgroup.Group
|
||||||
@@ -263,6 +270,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
walBytesCounter.Add(float64(n))
|
walBytesCounter.Add(float64(n))
|
||||||
|
bytesWritten += n
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy frames.
|
// Copy frames.
|
||||||
@@ -289,6 +297,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
walBytesCounter.Add(float64(n))
|
walBytesCounter.Add(float64(n))
|
||||||
|
bytesWritten += n
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush LZ4 writer, encryption writer and close pipe.
|
// Flush LZ4 writer, encryption writer and close pipe.
|
||||||
@@ -314,6 +323,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
|||||||
replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index))
|
replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index))
|
||||||
replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset))
|
replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset))
|
||||||
|
|
||||||
|
logger.Info("wal segment written", "position", initialPos.String(), "elapsed", time.Since(startTime).String(), "sz", bytesWritten)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -535,6 +545,10 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
|||||||
return wc.Close()
|
return wc.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
logger := r.Logger()
|
||||||
|
logger.Info("write snapshot", "position", pos.String())
|
||||||
|
|
||||||
|
startTime := time.Now()
|
||||||
// Delegate write to client & wait for writer goroutine to finish.
|
// Delegate write to client & wait for writer goroutine to finish.
|
||||||
if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil {
|
if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
@@ -542,8 +556,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
|||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Logger.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
|
logger.Info("snapshot written", "position", pos.String(), "elapsed", time.Since(startTime).String(), "sz", info.Size)
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -610,7 +623,7 @@ func (r *Replica) deleteSnapshotsBeforeIndex(ctx context.Context, generation str
|
|||||||
if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil {
|
if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil {
|
||||||
return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err)
|
return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err)
|
||||||
}
|
}
|
||||||
r.Logger.Printf("%s(%s): snapshot deleted %s/%08x", r.db.Path(), r.Name(), generation, index)
|
r.Logger().Info("snapshot deleted", "generation", generation, "index", index)
|
||||||
}
|
}
|
||||||
|
|
||||||
return itr.Close()
|
return itr.Close()
|
||||||
@@ -642,8 +655,8 @@ func (r *Replica) deleteWALSegmentsBeforeIndex(ctx context.Context, generation s
|
|||||||
if err := r.Client.DeleteWALSegments(ctx, a); err != nil {
|
if err := r.Client.DeleteWALSegments(ctx, a); err != nil {
|
||||||
return fmt.Errorf("delete wal segments: %w", err)
|
return fmt.Errorf("delete wal segments: %w", err)
|
||||||
}
|
}
|
||||||
r.Logger.Printf("%s(%s): wal segmented deleted before %s/%08x: n=%d", r.db.Path(), r.Name(), generation, index, len(a))
|
|
||||||
|
|
||||||
|
r.Logger().Info("wal segmented deleted before", "generation", generation, "index", index, "n", len(a))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -679,7 +692,7 @@ func (r *Replica) monitor(ctx context.Context) {
|
|||||||
|
|
||||||
// Synchronize the shadow wal into the replication directory.
|
// Synchronize the shadow wal into the replication directory.
|
||||||
if err := r.Sync(ctx); err != nil {
|
if err := r.Sync(ctx); err != nil {
|
||||||
r.Logger.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err)
|
r.Logger().Error("monitor error", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -707,7 +720,7 @@ func (r *Replica) retainer(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err := r.EnforceRetention(ctx); err != nil {
|
if err := r.EnforceRetention(ctx); err != nil {
|
||||||
r.Logger.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err)
|
r.Logger().Error("retainer error", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -720,6 +733,31 @@ func (r *Replica) snapshotter(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := r.Logger()
|
||||||
|
if pos, err := r.db.Pos(); err != nil {
|
||||||
|
logger.Error("snapshotter cannot determine generation", "error", err)
|
||||||
|
} else if !pos.IsZero() {
|
||||||
|
if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil {
|
||||||
|
logger.Error("snapshotter cannot determine latest snapshot", "error", err)
|
||||||
|
} else if snapshot != nil {
|
||||||
|
nextSnapshot := r.SnapshotInterval - time.Since(snapshot.CreatedAt)
|
||||||
|
if nextSnapshot < 0 {
|
||||||
|
nextSnapshot = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("snapshot interval adjusted", "previous", snapshot.CreatedAt.Format(time.RFC3339), "next", nextSnapshot.String())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(nextSnapshot):
|
||||||
|
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
|
||||||
|
logger.Error("snapshotter error", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(r.SnapshotInterval)
|
ticker := time.NewTicker(r.SnapshotInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@@ -729,7 +767,7 @@ func (r *Replica) snapshotter(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
|
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
|
||||||
r.Logger.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err)
|
r.Logger().Error("snapshotter error", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -757,7 +795,7 @@ func (r *Replica) validator(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err := r.Validate(ctx); err != nil {
|
if err := r.Validate(ctx); err != nil {
|
||||||
r.Logger.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err)
|
r.Logger().Error("validation error", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -794,7 +832,6 @@ func (r *Replica) Validate(ctx context.Context) error {
|
|||||||
ReplicaName: r.Name(),
|
ReplicaName: r.Name(),
|
||||||
Generation: pos.Generation,
|
Generation: pos.Generation,
|
||||||
Index: pos.Index - 1,
|
Index: pos.Index - 1,
|
||||||
Logger: r.Logger,
|
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return fmt.Errorf("cannot restore: %w", err)
|
return fmt.Errorf("cannot restore: %w", err)
|
||||||
}
|
}
|
||||||
@@ -819,7 +856,7 @@ func (r *Replica) Validate(ctx context.Context) error {
|
|||||||
if mismatch {
|
if mismatch {
|
||||||
status = "mismatch"
|
status = "mismatch"
|
||||||
}
|
}
|
||||||
r.Logger.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos)
|
r.Logger().Info("validator", "status", status, "db", fmt.Sprintf("%016x", chksum0), "replica", fmt.Sprintf("%016x", chksum1), "position", pos.String())
|
||||||
|
|
||||||
// Validate checksums match.
|
// Validate checksums match.
|
||||||
if mismatch {
|
if mismatch {
|
||||||
@@ -837,8 +874,6 @@ func (r *Replica) Validate(ctx context.Context) error {
|
|||||||
|
|
||||||
// waitForReplica blocks until replica reaches at least the given position.
|
// waitForReplica blocks until replica reaches at least the given position.
|
||||||
func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
|
func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
|
||||||
db := r.DB()
|
|
||||||
|
|
||||||
ticker := time.NewTicker(500 * time.Millisecond)
|
ticker := time.NewTicker(500 * time.Millisecond)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@@ -861,7 +896,7 @@ func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error {
|
|||||||
// Obtain current position of replica, check if past target position.
|
// Obtain current position of replica, check if past target position.
|
||||||
curr := r.Pos()
|
curr := r.Pos()
|
||||||
if curr.IsZero() {
|
if curr.IsZero() {
|
||||||
r.Logger.Printf("%s(%s): validator: no replica position available", db.Path(), r.Name())
|
r.Logger().Info("validator: no replica position available")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1013,17 +1048,6 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
|||||||
return fmt.Errorf("cannot specify index & timestamp to restore")
|
return fmt.Errorf("cannot specify index & timestamp to restore")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure logger exists.
|
|
||||||
logger := opt.Logger
|
|
||||||
if logger == nil {
|
|
||||||
logger = r.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
logPrefix := r.Name()
|
|
||||||
if db := r.DB(); db != nil {
|
|
||||||
logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure output path does not already exist.
|
// Ensure output path does not already exist.
|
||||||
if _, err := os.Stat(opt.OutputPath); err == nil {
|
if _, err := os.Stat(opt.OutputPath); err == nil {
|
||||||
return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
|
return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath)
|
||||||
@@ -1070,19 +1094,19 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
|||||||
tmpPath := opt.OutputPath + ".tmp"
|
tmpPath := opt.OutputPath + ".tmp"
|
||||||
|
|
||||||
// Copy snapshot to output path.
|
// Copy snapshot to output path.
|
||||||
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
|
r.Logger().Info("restoring snapshot", "generation", opt.Generation, "index", minWALIndex, "path", tmpPath)
|
||||||
if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil {
|
if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil {
|
||||||
return fmt.Errorf("cannot restore snapshot: %w", err)
|
return fmt.Errorf("cannot restore snapshot: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no WAL files available, move snapshot to final path & exit early.
|
// If no WAL files available, move snapshot to final path & exit early.
|
||||||
if snapshotOnly {
|
if snapshotOnly {
|
||||||
logger.Printf("%s: snapshot only, finalizing database", logPrefix)
|
r.Logger().Info("snapshot only, finalizing database")
|
||||||
return os.Rename(tmpPath, opt.OutputPath)
|
return os.Rename(tmpPath, opt.OutputPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Begin processing WAL files.
|
// Begin processing WAL files.
|
||||||
logger.Printf("%s: restoring wal files: generation=%s index=[%08x,%08x]", logPrefix, opt.Generation, minWALIndex, maxWALIndex)
|
r.Logger().Info("restoring wal files", "generation", opt.Generation, "index_min", minWALIndex, "index_max", maxWALIndex)
|
||||||
|
|
||||||
// Fill input channel with all WAL indexes to be loaded in order.
|
// Fill input channel with all WAL indexes to be loaded in order.
|
||||||
// Verify every index has at least one offset.
|
// Verify every index has at least one offset.
|
||||||
@@ -1138,9 +1162,9 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Printf("%s: downloaded wal %s/%08x elapsed=%s",
|
r.Logger().Info("downloaded wal",
|
||||||
logPrefix, opt.Generation, index,
|
"generation", opt.Generation, "index", index,
|
||||||
time.Since(startTime).String(),
|
"elapsed", time.Since(startTime).String(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1167,10 +1191,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
|||||||
if err = applyWAL(ctx, index, tmpPath); err != nil {
|
if err = applyWAL(ctx, index, tmpPath); err != nil {
|
||||||
return fmt.Errorf("cannot apply wal: %w", err)
|
return fmt.Errorf("cannot apply wal: %w", err)
|
||||||
}
|
}
|
||||||
logger.Printf("%s: applied wal %s/%08x elapsed=%s",
|
r.Logger().Info("applied wal", "generation", opt.Generation, "index", index, "elapsed", time.Since(startTime).String())
|
||||||
logPrefix, opt.Generation, index,
|
|
||||||
time.Since(startTime).String(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure all goroutines finish. All errors should have been handled during
|
// Ensure all goroutines finish. All errors should have been handled during
|
||||||
@@ -1180,7 +1201,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy file to final location.
|
// Copy file to final location.
|
||||||
logger.Printf("%s: renaming database from temporary location", logPrefix)
|
r.Logger().Info("renaming database from temporary location")
|
||||||
if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
|
if err := os.Rename(tmpPath, opt.OutputPath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -716,6 +716,9 @@ func ParseHost(s string) (bucket, region, endpoint string, forcePathStyle bool)
|
|||||||
} else if a := digitalOceanRegex.FindStringSubmatch(host); a != nil {
|
} else if a := digitalOceanRegex.FindStringSubmatch(host); a != nil {
|
||||||
bucket, region = a[1], a[2]
|
bucket, region = a[1], a[2]
|
||||||
endpoint = fmt.Sprintf("%s.digitaloceanspaces.com", region)
|
endpoint = fmt.Sprintf("%s.digitaloceanspaces.com", region)
|
||||||
|
} else if a := scalewayRegex.FindStringSubmatch(host); a != nil {
|
||||||
|
bucket, region = a[1], a[2]
|
||||||
|
endpoint = fmt.Sprintf("s3.%s.scw.cloud", region)
|
||||||
} else if a := linodeRegex.FindStringSubmatch(host); a != nil {
|
} else if a := linodeRegex.FindStringSubmatch(host); a != nil {
|
||||||
bucket, region = a[1], a[2]
|
bucket, region = a[1], a[2]
|
||||||
endpoint = fmt.Sprintf("%s.linodeobjects.com", region)
|
endpoint = fmt.Sprintf("%s.linodeobjects.com", region)
|
||||||
@@ -742,6 +745,7 @@ var (
|
|||||||
backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`)
|
backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`)
|
||||||
filebaseRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.filebase.com$`)
|
filebaseRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.filebase.com$`)
|
||||||
digitalOceanRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.digitaloceanspaces.com$`)
|
digitalOceanRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.digitaloceanspaces.com$`)
|
||||||
|
scalewayRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.scw\.cloud$`)
|
||||||
linodeRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.linodeobjects.com$`)
|
linodeRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.linodeobjects.com$`)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -761,7 +765,7 @@ func deleteOutputError(out *s3.DeleteObjectsOutput) error {
|
|||||||
case 1:
|
case 1:
|
||||||
return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message)
|
return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("%d errors occured deleting objects, %s: %s - (%s (and %d others)",
|
return fmt.Errorf("%d errors occurred deleting objects, %s: %s - (%s (and %d others)",
|
||||||
len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1)
|
len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user