From f930ffe9e2fd698b8df3a38fb2bb5e1af22faacf Mon Sep 17 00:00:00 2001 From: Pontus Leitzler Date: Wed, 28 Nov 2018 09:19:51 +0100 Subject: [PATCH] Fix gcs context (#6869) --- cmd/gateway/gcs/gateway-gcs.go | 80 +++++++++++++++++----------------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 19edc7312..165b2d7a3 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -206,11 +206,10 @@ func (g *GCS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) gcs := &gcsGateway{ client: client, projectID: g.projectID, - ctx: ctx, } // Start background process to cleanup old files in minio.sys.tmp - go gcs.CleanupGCSMinioSysTmp() + go gcs.CleanupGCSMinioSysTmp(ctx) return gcs, nil } @@ -349,7 +348,6 @@ type gcsGateway struct { minio.GatewayUnsupported client *storage.Client projectID string - ctx context.Context } // Returns projectID from the GOOGLE_APPLICATION_CREDENTIALS file. @@ -366,8 +364,8 @@ func gcsParseProjectID(credsFile string) (projectID string, err error) { } // Cleanup old files in minio.sys.tmp of the given bucket. -func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) { - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: minio.GatewayMinioSysTmp, Versions: false}) +func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(ctx context.Context, bucket string) { + it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: minio.GatewayMinioSysTmp, Versions: false}) for { attrs, err := it.Next() if err != nil { @@ -380,7 +378,7 @@ func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) { } if time.Since(attrs.Updated) > gcsMultipartExpiry { // Delete files older than 2 weeks. - err := l.client.Bucket(bucket).Object(attrs.Name).Delete(l.ctx) + err := l.client.Bucket(bucket).Object(attrs.Name).Delete(ctx) if err != nil { reqInfo := &logger.ReqInfo{BucketName: bucket, ObjectName: attrs.Name} ctx := logger.SetReqInfo(context.Background(), reqInfo) @@ -392,9 +390,9 @@ func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) { } // Cleanup old files in minio.sys.tmp of all buckets. -func (l *gcsGateway) CleanupGCSMinioSysTmp() { +func (l *gcsGateway) CleanupGCSMinioSysTmp(ctx context.Context) { for { - it := l.client.Buckets(l.ctx, l.projectID) + it := l.client.Buckets(ctx, l.projectID) for { attrs, err := it.Next() if err != nil { @@ -404,7 +402,7 @@ func (l *gcsGateway) CleanupGCSMinioSysTmp() { } break } - l.CleanupGCSMinioSysTmpBucket(attrs.Name) + l.CleanupGCSMinioSysTmpBucket(ctx, attrs.Name) } // Run the cleanup loop every 1 day. time.Sleep(gcsCleanupInterval) @@ -431,7 +429,7 @@ func (l *gcsGateway) MakeBucketWithLocation(ctx context.Context, bucket, locatio location = "us" } - err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{ + err := bkt.Create(ctx, l.projectID, &storage.BucketAttrs{ Location: location, }) logger.LogIf(ctx, err) @@ -440,7 +438,7 @@ func (l *gcsGateway) MakeBucketWithLocation(ctx context.Context, bucket, locatio // GetBucketInfo - Get bucket metadata.. func (l *gcsGateway) GetBucketInfo(ctx context.Context, bucket string) (minio.BucketInfo, error) { - attrs, err := l.client.Bucket(bucket).Attrs(l.ctx) + attrs, err := l.client.Bucket(bucket).Attrs(ctx) if err != nil { logger.LogIf(ctx, err) return minio.BucketInfo{}, gcsToObjectError(err, bucket) @@ -454,7 +452,7 @@ func (l *gcsGateway) GetBucketInfo(ctx context.Context, bucket string) (minio.Bu // ListBuckets lists all buckets under your project-id on GCS. func (l *gcsGateway) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) { - it := l.client.Buckets(l.ctx, l.projectID) + it := l.client.Buckets(ctx, l.projectID) // Iterate and capture all the buckets. for { @@ -479,7 +477,7 @@ func (l *gcsGateway) ListBuckets(ctx context.Context) (buckets []minio.BucketInf // DeleteBucket delete a bucket on GCS. func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string) error { - itObject := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{ + itObject := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Delimiter: "/", Versions: false, }) @@ -509,7 +507,7 @@ func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string) error { } if gcsMinioPathFound { // Remove minio.sys.tmp before deleting the bucket. - itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: minio.GatewayMinioSysTmp}) + itObject = l.client.Bucket(bucket).Objects(ctx, &storage.Query{Versions: false, Prefix: minio.GatewayMinioSysTmp}) for { objAttrs, err := itObject.Next() if err == iterator.Done { @@ -519,14 +517,14 @@ func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string) error { logger.LogIf(ctx, err) return gcsToObjectError(err) } - err = l.client.Bucket(bucket).Object(objAttrs.Name).Delete(l.ctx) + err = l.client.Bucket(bucket).Object(objAttrs.Name).Delete(ctx) if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err) } } } - err := l.client.Bucket(bucket).Delete(l.ctx) + err := l.client.Bucket(bucket).Delete(ctx) logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } @@ -561,7 +559,7 @@ func (l *gcsGateway) ListObjects(ctx context.Context, bucket string, prefix stri return minio.ListObjectsInfo{}, nil } - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{ + it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Delimiter: delimiter, Prefix: prefix, Versions: false, @@ -668,7 +666,7 @@ func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continua return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, nil } - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{ + it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Delimiter: delimiter, Prefix: prefix, Versions: false, @@ -769,7 +767,7 @@ func (l *gcsGateway) GetObjectNInfo(ctx context.Context, bucket, object string, func (l *gcsGateway) GetObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error { // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket - if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { + if _, err := l.client.Bucket(bucket).Attrs(ctx); err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } @@ -781,7 +779,7 @@ func (l *gcsGateway) GetObject(ctx context.Context, bucket string, key string, s // Calling ReadCompressed with true accomplishes that. object := l.client.Bucket(bucket).Object(key).ReadCompressed(true) - r, err := object.NewRangeReader(l.ctx, startOffset, length) + r, err := object.NewRangeReader(ctx, startOffset, length) if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket, key) @@ -864,12 +862,12 @@ func applyMetadataToGCSAttrs(metadata map[string]string, attrs *storage.ObjectAt func (l *gcsGateway) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) { // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket - if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { + if _, err := l.client.Bucket(bucket).Attrs(ctx); err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket) } - attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx) + attrs, err := l.client.Bucket(bucket).Object(object).Attrs(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, object) @@ -884,14 +882,14 @@ func (l *gcsGateway) PutObject(ctx context.Context, bucket string, key string, r // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket - if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { + if _, err := l.client.Bucket(bucket).Attrs(ctx); err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket) } object := l.client.Bucket(bucket).Object(key) - w := object.NewWriter(l.ctx) + w := object.NewWriter(ctx) // Disable "chunked" uploading in GCS client if the size of the data to be uploaded is below // the current chunk-size of the writer. This avoids an unnecessary memory allocation. if data.Size() < int64(w.ChunkSize) { @@ -909,7 +907,7 @@ func (l *gcsGateway) PutObject(ctx context.Context, bucket string, key string, r // Close the object writer upon success. w.Close() - attrs, err := object.Attrs(l.ctx) + attrs, err := object.Attrs(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) @@ -928,7 +926,7 @@ func (l *gcsGateway) CopyObject(ctx context.Context, srcBucket string, srcObject copier := dst.CopierFrom(src) applyMetadataToGCSAttrs(srcInfo.UserDefined, &copier.ObjectAttrs) - attrs, err := copier.Run(l.ctx) + attrs, err := copier.Run(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, destBucket, destObject) @@ -939,7 +937,7 @@ func (l *gcsGateway) CopyObject(ctx context.Context, srcBucket string, srcObject // DeleteObject - Deletes a blob in bucket func (l *gcsGateway) DeleteObject(ctx context.Context, bucket string, object string) error { - err := l.client.Bucket(bucket).Object(object).Delete(l.ctx) + err := l.client.Bucket(bucket).Object(object).Delete(ctx) if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket, object) @@ -956,7 +954,7 @@ func (l *gcsGateway) NewMultipartUpload(ctx context.Context, bucket string, key // generate name for part zero meta := gcsMultipartMetaName(uploadID) - w := l.client.Bucket(bucket).Object(meta).NewWriter(l.ctx) + w := l.client.Bucket(bucket).Object(meta).NewWriter(ctx) defer w.Close() applyMetadataToGCSAttrs(metadata, &w.ObjectAttrs) @@ -1056,7 +1054,7 @@ func (l *gcsGateway) ListMultipartUploads(ctx context.Context, bucket string, pr // Checks if minio.sys.tmp/multipart/v1//gcs.json exists, returns // an object layer compatible error upon any error. func (l *gcsGateway) checkUploadIDExists(ctx context.Context, bucket string, key string, uploadID string) error { - _, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(l.ctx) + _, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(ctx) logger.LogIf(ctx, err) return gcsToObjectError(err, bucket, key, uploadID) } @@ -1073,7 +1071,7 @@ func (l *gcsGateway) PutObjectPart(ctx context.Context, bucket string, key strin etag = minio.GenETag() } object := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, partNumber, etag)) - w := object.NewWriter(l.ctx) + w := object.NewWriter(ctx) // Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case // where it tries to upload 0 bytes in the last chunk and get error from server. w.ChunkSize = 0 @@ -1186,7 +1184,7 @@ func (l *gcsGateway) cleanupMultipartUpload(ctx context.Context, bucket, key, up prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPathV1, uploadID) // iterate through all parts and delete them - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: prefix, Versions: false}) + it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: prefix, Versions: false}) for { attrs, err := it.Next() @@ -1200,7 +1198,7 @@ func (l *gcsGateway) cleanupMultipartUpload(ctx context.Context, bucket, key, up object := l.client.Bucket(bucket).Object(attrs.Name) // Ignore the error as parallel AbortMultipartUpload might have deleted it. - object.Delete(l.ctx) + object.Delete(ctx) } return nil @@ -1222,13 +1220,13 @@ func (l *gcsGateway) CompleteMultipartUpload(ctx context.Context, bucket string, meta := gcsMultipartMetaName(uploadID) object := l.client.Bucket(bucket).Object(meta) - partZeroAttrs, err := object.Attrs(l.ctx) + partZeroAttrs, err := object.Attrs(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key, uploadID) } - r, err := object.NewReader(l.ctx) + r, err := object.NewReader(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) @@ -1259,7 +1257,7 @@ func (l *gcsGateway) CompleteMultipartUpload(ctx context.Context, bucket string, for i, uploadedPart := range uploadedParts { parts = append(parts, l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.PartNumber, uploadedPart.ETag))) - partAttr, pErr := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.PartNumber, uploadedPart.ETag)).Attrs(l.ctx) + partAttr, pErr := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.PartNumber, uploadedPart.ETag)).Attrs(ctx) if pErr != nil { logger.LogIf(ctx, pErr) return minio.ObjectInfo{}, gcsToObjectError(pErr, bucket, key, uploadID) @@ -1305,7 +1303,7 @@ func (l *gcsGateway) CompleteMultipartUpload(ctx context.Context, bucket string, composer.ContentType = partZeroAttrs.ContentType composer.Metadata = partZeroAttrs.Metadata - if _, err = composer.Run(l.ctx); err != nil { + if _, err = composer.Run(ctx); err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } @@ -1322,7 +1320,7 @@ func (l *gcsGateway) CompleteMultipartUpload(ctx context.Context, bucket string, composer.ContentDisposition = partZeroAttrs.ContentDisposition composer.ContentLanguage = partZeroAttrs.ContentLanguage composer.Metadata = partZeroAttrs.Metadata - attrs, err := composer.Run(l.ctx) + attrs, err := composer.Run(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) @@ -1362,7 +1360,7 @@ func (l *gcsGateway) SetBucketPolicy(ctx context.Context, bucket string, bucketP acl := l.client.Bucket(bucket).ACL() if policies[0].Policy == miniogopolicy.BucketPolicyNone { - if err := acl.Delete(l.ctx, storage.AllUsers); err != nil { + if err := acl.Delete(ctx, storage.AllUsers); err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } @@ -1380,7 +1378,7 @@ func (l *gcsGateway) SetBucketPolicy(ctx context.Context, bucket string, bucketP return minio.NotImplemented{} } - if err := acl.Set(l.ctx, storage.AllUsers, role); err != nil { + if err := acl.Set(ctx, storage.AllUsers, role); err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } @@ -1390,7 +1388,7 @@ func (l *gcsGateway) SetBucketPolicy(ctx context.Context, bucket string, bucketP // GetBucketPolicy - Get policy on bucket func (l *gcsGateway) GetBucketPolicy(ctx context.Context, bucket string) (*policy.Policy, error) { - rules, err := l.client.Bucket(bucket).ACL().List(l.ctx) + rules, err := l.client.Bucket(bucket).ACL().List(ctx) if err != nil { return nil, gcsToObjectError(err, bucket) } @@ -1449,7 +1447,7 @@ func (l *gcsGateway) GetBucketPolicy(ctx context.Context, bucket string) (*polic // DeleteBucketPolicy - Delete all policies on bucket func (l *gcsGateway) DeleteBucketPolicy(ctx context.Context, bucket string) error { // This only removes the storage.AllUsers policies - if err := l.client.Bucket(bucket).ACL().Delete(l.ctx, storage.AllUsers); err != nil { + if err := l.client.Bucket(bucket).ACL().Delete(ctx, storage.AllUsers); err != nil { return gcsToObjectError(err, bucket) }