Add context to s3

This commit is contained in:
Ben Johnson
2021-01-13 16:38:00 -07:00
parent 9eb7bd41c2
commit 294846cce2
2 changed files with 14 additions and 15 deletions

View File

@@ -35,7 +35,7 @@ type Replica interface {
LastPos() Pos LastPos() Pos
// Returns the computed position of the replica for a given generation. // Returns the computed position of the replica for a given generation.
CalcPos(generation string) (Pos, error) CalcPos(ctx context.Context, generation string) (Pos, error)
// Returns a list of generation names for the replica. // Returns a list of generation names for the replica.
Generations(ctx context.Context) ([]string, error) Generations(ctx context.Context) ([]string, error)
@@ -422,7 +422,7 @@ func (r *FileReplica) retainer(ctx context.Context) {
// CalcPos returns the position for the replica for the current generation. // CalcPos returns the position for the replica for the current generation.
// Returns a zero value if there is no active generation. // Returns a zero value if there is no active generation.
func (r *FileReplica) CalcPos(generation string) (pos Pos, err error) { func (r *FileReplica) CalcPos(ctx context.Context, generation string) (pos Pos, err error) {
pos.Generation = generation pos.Generation = generation
// Find maximum snapshot index. // Find maximum snapshot index.

View File

@@ -42,7 +42,6 @@ type Replica struct {
pos litestream.Pos // last position pos litestream.Pos // last position
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context
cancel func() cancel func()
// AWS authentication keys. // AWS authentication keys.
@@ -143,7 +142,7 @@ func (r *Replica) Generations(ctx context.Context) ([]string, error) {
} }
var generations []string var generations []string
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.Path, "generations") + "/"), Prefix: aws.String(path.Join(r.Path, "generations") + "/"),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@@ -170,7 +169,7 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats
} }
// Determine stats for all snapshots. // Determine stats for all snapshots.
n, min, max, err := r.snapshotStats(generation) n, min, max, err := r.snapshotStats(ctx, generation)
if err != nil { if err != nil {
return stats, err return stats, err
} }
@@ -178,7 +177,7 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats
stats.CreatedAt, stats.UpdatedAt = min, max stats.CreatedAt, stats.UpdatedAt = min, max
// Update stats if we have WAL files. // Update stats if we have WAL files.
n, min, max, err = r.walStats(generation) n, min, max, err = r.walStats(ctx, generation)
if err != nil { if err != nil {
return stats, err return stats, err
} else if n == 0 { } else if n == 0 {
@@ -195,8 +194,8 @@ func (r *Replica) GenerationStats(ctx context.Context, generation string) (stats
return stats, nil return stats, nil
} }
func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, err error) { func (r *Replica) snapshotStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"), Prefix: aws.String(r.SnapshotDir(generation) + "/"),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@@ -222,8 +221,8 @@ func (r *Replica) snapshotStats(generation string) (n int, min, max time.Time, e
return n, min, max, nil return n, min, max, nil
} }
func (r *Replica) walStats(generation string) (n int, min, max time.Time, err error) { func (r *Replica) walStats(ctx context.Context, generation string) (n int, min, max time.Time, err error) {
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"), Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@@ -262,7 +261,7 @@ func (r *Replica) Snapshots(ctx context.Context) ([]*litestream.SnapshotInfo, er
var infos []*litestream.SnapshotInfo var infos []*litestream.SnapshotInfo
for _, generation := range generations { for _, generation := range generations {
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.SnapshotDir(generation) + "/"), Prefix: aws.String(r.SnapshotDir(generation) + "/"),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@@ -306,7 +305,7 @@ func (r *Replica) WALs(ctx context.Context) ([]*litestream.WALInfo, error) {
var infos []*litestream.WALInfo var infos []*litestream.WALInfo
for _, generation := range generations { for _, generation := range generations {
var prev *litestream.WALInfo var prev *litestream.WALInfo
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"), Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@@ -432,7 +431,7 @@ func (r *Replica) CalcPos(generation string) (pos litestream.Pos, err error) {
index := -1 index := -1
var offset int64 var offset int64
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.WALDir(generation) + "/"), Prefix: aws.String(r.WALDir(generation) + "/"),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@@ -669,7 +668,7 @@ func (r *Replica) WALReader(ctx context.Context, generation string, index int) (
var keys []string var keys []string
var offset int64 var offset int64
var innerErr error var innerErr error
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%016x_", index))), Prefix: aws.String(path.Join(r.WALDir(generation), fmt.Sprintf("%016x_", index))),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool { }, func(page *s3.ListObjectsOutput, lastPage bool) bool {
@@ -782,7 +781,7 @@ func (r *Replica) EnforceRetention(ctx context.Context) (err error) {
func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, index int) (err error) { func (r *Replica) deleteGenerationBefore(ctx context.Context, generation string, index int) (err error) {
// Collect all files for the generation. // Collect all files for the generation.
var objIDs []*s3.ObjectIdentifier var objIDs []*s3.ObjectIdentifier
if err := r.s3.ListObjectsPages(&s3.ListObjectsInput{ if err := r.s3.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{
Bucket: aws.String(r.Bucket), Bucket: aws.String(r.Bucket),
Prefix: aws.String(r.GenerationDir(generation)), Prefix: aws.String(r.GenerationDir(generation)),
}, func(page *s3.ListObjectsOutput, lastPage bool) bool { }, func(page *s3.ListObjectsOutput, lastPage bool) bool {