From 294846cce29ccbd757f4de98531ba0c95a4b3a07 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 13 Jan 2021 16:38:00 -0700 Subject: [PATCH] Add context to s3 --- replica.go | 4 ++-- s3/s3.go | 25 ++++++++++++------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/replica.go b/replica.go index aedb47d..e50c691 100644 --- a/replica.go +++ b/replica.go @@ -35,7 +35,7 @@ type Replica interface { LastPos() Pos // Returns the computed position of the replica for a given generation. - CalcPos(generation string) (Pos, error) + CalcPos(ctx context.Context, generation string) (Pos, error) // Returns a list of generation names for the replica. Generations(ctx context.Context) ([]string, error) @@ -422,7 +422,7 @@ func (r *FileReplica) retainer(ctx context.Context) { // CalcPos returns the position for the replica for the current generation. // Returns a zero value if there is no active generation. -func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) { +func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, err error) { pos.Generation = generation // Find maximum snapshot index. diff --git a/s3/s3.go b/s3/s3.go index 9366af4..fea4bf8 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -42,7 +42,6 @@ type Replica struct { pos litestream.Pos // last position wg sync.WaitGroup - ctx context.Context cancel func() // AWS authentication keys. @@ -143,7 +142,7 @@ func (r *Replica) Generations(ctx context.Context) ([]string, error) { } var generations []string - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(path.Join(r.Path, "generations") + "/"), Delimiter: aws.String("/"), @@ -170,7 +169,7 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats } // Determine stats for all snapshots. - n, min, max, err := r.snapshotStats(generation) + n, min, max, err := r.snapshotStats(ctx, generation) if err != nil { return stats, err } @@ -178,7 +177,7 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats stats.CreatedAt, stats.UpdatedAt = min, max // Update stats if we have WAL files. - n, min, max, err = r.walStats(generation) + n, min, max, err = r.walStats(ctx, generation) if err != nil { return stats, err } else if n == 0 { @@ -195,8 +194,8 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats return stats, nil } -func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, err error) { - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ +func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) { + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.SnapshotDir(generation) + "/"), Delimiter: aws.String("/"), @@ -222,8 +221,8 @@ func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, e return n, min, max, nil } -func (r *Replica) walStats(generation string) (n int, min, max time.Time, err error) { - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ +func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) { + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), @@ -262,7 +261,7 @@ func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, er var infos []*litestream.SnapshotInfo for _, generation := range generations { - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.SnapshotDir(generation) + "/"), Delimiter: aws.String("/"), @@ -306,7 +305,7 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) { var infos []*litestream.WALInfo for _, generation := range generations { var prev *litestream.WALInfo - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), @@ -432,7 +431,7 @@ func (r *Replica) CalcPos(generation string) (pos litestream.Pos, err error) { index := -1 var offset int64 - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.WALDir(generation) + "/"), Delimiter: aws.String("/"), @@ -669,7 +668,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) ( var keys []string var offset int64 var innerErr error - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%016x_", index))), }, func(page *s3.ListObjectsOutput, lastPage bool) bool { @@ -782,7 +781,7 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) { func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, index int) (err error) { // Collect all files for the generation. var objIDs []*s3.ObjectIdentifier - if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ + if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ Bucket: aws.String(r.Bucket), Prefix: aws.String(r.GenerationDir(generation)), }, func(page *s3.ListObjectsOutput, lastPage bool) bool {