Client side encryption support for remote storage (#468)
This commit is contained in:
71
replica.go
71
replica.go
@@ -15,6 +15,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"filippo.io/age"
|
||||
"github.com/benbjohnson/litestream/internal"
|
||||
"github.com/pierrec/lz4/v4"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -67,6 +68,10 @@ type Replica struct {
|
||||
// If true, replica monitors database for changes automatically.
|
||||
// Set to false if replica is being used synchronously (such as in tests).
|
||||
MonitorEnabled bool
|
||||
|
||||
// Encryption identities and recipients
|
||||
AgeIdentities []age.Identity
|
||||
AgeRecipients []age.Recipient
|
||||
}
|
||||
|
||||
func NewReplica(db *DB, name string) *Replica {
|
||||
@@ -221,8 +226,20 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
||||
return err
|
||||
})
|
||||
|
||||
var ew io.WriteCloser = pw
|
||||
|
||||
// Add encryption if we have recipients.
|
||||
if len(r.AgeRecipients) > 0 {
|
||||
var err error
|
||||
ew, err = age.Encrypt(pw, r.AgeRecipients...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ew.Close()
|
||||
}
|
||||
|
||||
// Wrap writer to LZ4 compress.
|
||||
zw := lz4.NewWriter(pw)
|
||||
zw := lz4.NewWriter(ew)
|
||||
|
||||
// Track total WAL bytes written to replica client.
|
||||
walBytesCounter := replicaWALBytesCounterVec.WithLabelValues(r.db.Path(), r.Name())
|
||||
@@ -270,9 +287,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
|
||||
walBytesCounter.Add(float64(n))
|
||||
}
|
||||
|
||||
// Flush LZ4 writer and close pipe.
|
||||
// Flush LZ4 writer, encryption writer and close pipe.
|
||||
if err := zw.Close(); err != nil {
|
||||
return err
|
||||
} else if err := ew.Close(); err != nil {
|
||||
return err
|
||||
} else if err := pw.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -334,6 +353,15 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err
|
||||
}
|
||||
defer rd.Close()
|
||||
|
||||
if len(r.AgeIdentities) > 0 {
|
||||
drd, err := age.Decrypt(rd, r.AgeIdentities...)
|
||||
if err != nil {
|
||||
return pos, err
|
||||
}
|
||||
|
||||
rd = io.NopCloser(drd)
|
||||
}
|
||||
|
||||
n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd))
|
||||
if err != nil {
|
||||
return pos, err
|
||||
@@ -470,7 +498,23 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
||||
// Copy the database file to the LZ4 writer in a separate goroutine.
|
||||
var g errgroup.Group
|
||||
g.Go(func() error {
|
||||
zr := lz4.NewWriter(pw)
|
||||
// We need to ensure the pipe is closed.
|
||||
defer pw.Close()
|
||||
|
||||
var wc io.WriteCloser = pw
|
||||
|
||||
// Add encryption if we have recipients.
|
||||
if len(r.AgeRecipients) > 0 {
|
||||
var err error
|
||||
wc, err = age.Encrypt(pw, r.AgeRecipients...)
|
||||
if err != nil {
|
||||
pw.CloseWithError(err)
|
||||
return err
|
||||
}
|
||||
defer wc.Close()
|
||||
}
|
||||
|
||||
zr := lz4.NewWriter(wc)
|
||||
defer zr.Close()
|
||||
|
||||
if _, err := io.Copy(zr, r.f); err != nil {
|
||||
@@ -480,7 +524,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
|
||||
pw.CloseWithError(err)
|
||||
return err
|
||||
}
|
||||
return pw.Close()
|
||||
return wc.Close()
|
||||
})
|
||||
|
||||
// Delegate write to client & wait for writer goroutine to finish.
|
||||
@@ -1259,6 +1303,15 @@ func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index
|
||||
}
|
||||
defer rd.Close()
|
||||
|
||||
if len(r.AgeIdentities) > 0 {
|
||||
drd, err := age.Decrypt(rd, r.AgeIdentities...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rd = io.NopCloser(drd)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(f, lz4.NewReader(rd)); err != nil {
|
||||
return err
|
||||
} else if err := f.Sync(); err != nil {
|
||||
@@ -1285,6 +1338,16 @@ func (r *Replica) downloadWAL(ctx context.Context, generation string, index int,
|
||||
return err
|
||||
}
|
||||
defer rd.Close()
|
||||
|
||||
if len(r.AgeIdentities) > 0 {
|
||||
drd, err := age.Decrypt(rd, r.AgeIdentities...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rd = io.NopCloser(drd)
|
||||
}
|
||||
|
||||
readers = append(readers, lz4.NewReader(rd))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user