diff --git a/go.mod b/go.mod index 9afca13..4e8ab2c 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/aws/aws-sdk-go v1.27.0 + github.com/davecgh/go-spew v1.1.1 github.com/mattn/go-sqlite3 v1.14.5 github.com/prometheus/client_golang v1.9.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index b4cfcf4..40988cd 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,7 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= diff --git a/replica.go b/replica.go index 6448890..aedb47d 100644 --- a/replica.go +++ b/replica.go @@ -569,7 +569,7 @@ func (r *FileReplica) syncWAL(ctx context.Context) (err error) { return err } - // Create a temporary file to write into so we don't have partial writes. + // TODO: Create a temporary file to write into so we don't have partial writes. w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err diff --git a/s3/s3.go b/s3/s3.go index f9aee20..90754a2 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -1,6 +1,7 @@ package s3 import ( + "bytes" "compress/gzip" "context" "fmt" @@ -13,11 +14,13 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/internal" + "github.com/davecgh/go-spew/spew" ) const ( @@ -80,7 +83,7 @@ func (r *Replica) Name() string { // Type returns the type of replica. func (r *Replica) Type() string { - return "file" + return "s3" } // LastPos returns the last successfully replicated position. @@ -92,12 +95,12 @@ func (r *Replica) LastPos() litestream.Pos { // GenerationDir returns the path to a generation's root directory. func (r *Replica) GenerationDir(generation string) string { - return path.Join("/", r.Path, "generations", generation) + return path.Join(r.Path, "generations", generation) } // SnapshotDir returns the path to a generation's snapshot directory. func (r *Replica) SnapshotDir(generation string) string { - return path.Join("/", r.Path, r.GenerationDir(generation), "snapshots") + return path.Join(r.GenerationDir(generation), "snapshots") } // SnapshotPath returns the path to a snapshot file. @@ -131,11 +134,6 @@ func (r *Replica) WALDir(generation string) string { return path.Join(r.GenerationDir(generation), "wal") } -// WALPath returns the path to a WAL file. -func (r *Replica) WALPath(generation string, index int) string { - return path.Join(r.WALDir(generation), fmt.Sprintf("%016x.wal", index)) -} - // Generations returns a list of available generation names. func (r *Replica) Generations(ctx context.Context) ([]string, error) { if err := r.Init(ctx); err != nil { @@ -145,15 +143,15 @@ func (r *Replica) Generations(ctx context.Context) ([]string, error) { var generations []string if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), - Prefix: aws.String(path.Join("/", r.Path, "generations")), + Prefix: aws.String(path.Join(r.Path, "generations") + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { - for _, obj := range page.Contents { - key := path.Base(*obj.Key) - if !litestream.IsGenerationName(key) { + for _, prefix := range page.CommonPrefixes { + name := path.Base(*prefix.Prefix) + if !litestream.IsGenerationName(name) { continue } - generations = append(generations, key) + generations = append(generations, name) } return true }); err != nil { @@ -198,7 +196,7 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, err error) { if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), - Prefix: aws.String(r.SnapshotDir(generation)), + Prefix: aws.String(r.SnapshotDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, obj := range page.Contents { @@ -225,7 +223,7 @@ func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, e func (r *Replica) walStats(generation string) (n int, min, max time.Time, err error) { if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), - Prefix: aws.String(r.WALDir(generation)), + Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, obj := range page.Contents { @@ -264,7 +262,7 @@ func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, er for _, generation := range generations { if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), - Prefix: aws.String(r.SnapshotDir(generation)), + Prefix: aws.String(r.SnapshotDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, obj := range page.Contents { @@ -308,7 +306,7 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) { var prev *litestream.WALInfo if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), - Prefix: aws.String(r.WALDir(generation)), + Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, obj := range page.Contents { @@ -419,6 +417,8 @@ func (r *Replica) retainer(ctx context.Context) { // CalcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. func (r *Replica) CalcPos(generation string) (pos litestream.Pos, err error) { + println("dbg/calcpos", generation) + if err := r.Init(context.Background()); err != nil { return pos, err } @@ -429,14 +429,18 @@ func (r *Replica) CalcPos(generation string) (pos litestream.Pos, err error) { if pos.Index, err = r.MaxSnapshotIndex(generation); err != nil { return litestream.Pos{}, err } + println("dbg/calcpos.snapshotindex", pos.Index) index := -1 var offset int64 if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), - Prefix: aws.String(r.WALDir(generation)), + Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { + println("dbg/calcpos.page") + spew.Dump(page) + for _, obj := range page.Contents { key := path.Base(*obj.Key) @@ -485,7 +489,13 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er pr, pw := io.Pipe() gw := gzip.NewWriter(pw) - go func() { io.Copy(gw, f) }() + go func() { + if _, err := io.Copy(gw, f); err != nil { + _ = pw.CloseWithError(err) + return + } + _ = pw.CloseWithError(gw.Close()) + }() snapshotPath := r.SnapshotPath(generation, index) @@ -496,6 +506,7 @@ func (r *Replica) snapshot(ctx context.Context, generation string, index int) er }); err != nil { return err } + return nil } @@ -524,7 +535,10 @@ func (r *Replica) Init(ctx context.Context) (err error) { return nil } - sess, err := session.NewSession(&aws.Config{Region: aws.String(r.Region)}) + sess, err := session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials(r.AccessKeyID, r.SecretAccessKey, ""), + Region: aws.String(r.Region), + }) if err != nil { return fmt.Errorf("cannot create aws session: %w", err) } @@ -534,6 +548,7 @@ func (r *Replica) Init(ctx context.Context) (err error) { } func (r *Replica) Sync(ctx context.Context) (err error) { + println("dbg/s3.sync") if err := r.Init(ctx); err != nil { return err } @@ -589,27 +604,31 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { } defer rd.Close() - // Ensure parent directory exists for WAL file. - filename := r.WALPath(rd.Pos().Generation, rd.Pos().Index) - if err := os.MkdirAll(path.Dir(filename), 0700); err != nil { - return err - } - - // Create a temporary file to write into so we don't have partial writes. - w, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600) + // Read to intermediate buffer to determine size. + b, err := ioutil.ReadAll(rd) if err != nil { return err } - defer w.Close() - // Seek, copy & sync WAL contents. - if _, err := w.Seek(rd.Pos().Offset, io.SeekStart); err != nil { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + if _, err := gw.Write(b); err != nil { return err - } else if _, err := io.Copy(w, rd); err != nil { - return err - } else if err := w.Sync(); err != nil { - return err - } else if err := w.Close(); err != nil { + } + + // Build a WAL path with the index/offset as well as size so we can ensure + // that files are contiguous without having to decompress. + walPath := path.Join( + r.WALDir(rd.Pos().Generation), + litestream.FormatWALPathWithOffsetSize(rd.Pos().Index, rd.Pos().Offset, int64(len(b)))+".gz", + ) + println("dbg/syncwal", walPath) + + if _, err := r.uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(r.Bucket), + Key: aws.String(walPath), + Body: &buf, + }); err != nil { return err } @@ -661,30 +680,33 @@ func (r *Replica) SnapshotReader(ctx context.Context, generation string, index i // WALReader returns a reader for WAL data at the given index. // Returns os.ErrNotExist if no matching index is found. func (r *Replica) WALReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { - filename := r.WALPath(generation, index) + panic("TODO") + /* + filename := r.WALPath(generation, index) - // Attempt to read uncompressed file first. - f, err := os.Open(filename) - if err == nil { - return f, nil // file exist, return - } else if err != nil && !os.IsNotExist(err) { - return nil, err - } + // Attempt to read uncompressed file first. + f, err := os.Open(filename) + if err == nil { + return f, nil // file exist, return + } else if err != nil && !os.IsNotExist(err) { + return nil, err + } - // Otherwise read the compressed file. Return error if file doesn't exist. - f, err = os.Open(filename + ".gz") - if err != nil { - return nil, err - } + // Otherwise read the compressed file. Return error if file doesn't exist. + f, err = os.Open(filename + ".gz") + if err != nil { + return nil, err + } - // If compressed, wrap in a gzip reader and return with wrapper to - // ensure that the underlying file is closed. - rd, err := gzip.NewReader(f) - if err != nil { - f.Close() - return nil, err - } - return internal.NewReadCloser(rd, f), nil + // If compressed, wrap in a gzip reader and return with wrapper to + // ensure that the underlying file is closed. + rd, err := gzip.NewReader(f) + if err != nil { + f.Close() + return nil, err + } + return internal.NewReadCloser(rd, f), nil + */ } // EnforceRetention forces a new snapshot once the retention interval has passed.