Add context support for gateway b2, manta, gcs

This commit is contained in:
kannappanr
2018-03-14 17:31:14 -07:00
committed by kannappanr
parent e452377b24
commit bdb1a90720
3 changed files with 88 additions and 99 deletions

View File

@@ -199,18 +199,18 @@ func b2ToObjectError(err error, params ...string) error {
// Shutdown saves any gateway metadata to disk
// if necessary and reload upon next restart.
func (l *b2Objects) Shutdown() error {
func (l *b2Objects) Shutdown(ctx context.Context) error {
// TODO
return nil
}
// StorageInfo is not relevant to B2 backend.
func (l *b2Objects) StorageInfo() (si minio.StorageInfo) {
func (l *b2Objects) StorageInfo(ctx context.Context) (si minio.StorageInfo) {
return si
}
// MakeBucket creates a new container on B2 backend.
func (l *b2Objects) MakeBucketWithLocation(bucket, location string) error {
func (l *b2Objects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
// location is ignored for B2 backend.
// All buckets are set to private by default.
@@ -218,7 +218,7 @@ func (l *b2Objects) MakeBucketWithLocation(bucket, location string) error {
return b2ToObjectError(errors.Trace(err), bucket)
}
func (l *b2Objects) reAuthorizeAccount() error {
func (l *b2Objects) reAuthorizeAccount(ctx context.Context) error {
client, err := b2.AuthorizeAccount(l.ctx, l.creds.AccessKey, l.creds.SecretKey, b2.Transport(minio.NewCustomHTTPTransport()))
if err != nil {
return err
@@ -233,18 +233,18 @@ func (l *b2Objects) reAuthorizeAccount() error {
// the account and updates the B2 client safely. Once successfully
// authorized performs the call again and returns list of buckets.
// For any errors which are not actionable we return an error.
func (l *b2Objects) listBuckets(err error) ([]*b2.Bucket, error) {
func (l *b2Objects) listBuckets(ctx context.Context, err error) ([]*b2.Bucket, error) {
if err != nil {
if b2.Action(err) != b2.ReAuthenticate {
return nil, err
}
if rerr := l.reAuthorizeAccount(); rerr != nil {
if rerr := l.reAuthorizeAccount(ctx); rerr != nil {
return nil, rerr
}
}
bktList, lerr := l.b2Client.ListBuckets(l.ctx)
if lerr != nil {
return l.listBuckets(lerr)
return l.listBuckets(ctx, lerr)
}
return bktList, nil
}
@@ -253,8 +253,8 @@ func (l *b2Objects) listBuckets(err error) ([]*b2.Bucket, error) {
// for performing an API operation. B2 API doesn't
// provide a direct way to access the bucket so we need
// to employ following technique.
func (l *b2Objects) Bucket(bucket string) (*b2.Bucket, error) {
bktList, err := l.listBuckets(nil)
func (l *b2Objects) Bucket(ctx context.Context, bucket string) (*b2.Bucket, error) {
bktList, err := l.listBuckets(ctx, nil)
if err != nil {
return nil, b2ToObjectError(errors.Trace(err), bucket)
}
@@ -267,8 +267,8 @@ func (l *b2Objects) Bucket(bucket string) (*b2.Bucket, error) {
}
// GetBucketInfo gets bucket metadata..
func (l *b2Objects) GetBucketInfo(bucket string) (bi minio.BucketInfo, err error) {
if _, err = l.Bucket(bucket); err != nil {
func (l *b2Objects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
if _, err = l.Bucket(ctx, bucket); err != nil {
return bi, err
}
return minio.BucketInfo{
@@ -278,8 +278,8 @@ func (l *b2Objects) GetBucketInfo(bucket string) (bi minio.BucketInfo, err error
}
// ListBuckets lists all B2 buckets
func (l *b2Objects) ListBuckets() ([]minio.BucketInfo, error) {
bktList, err := l.listBuckets(nil)
func (l *b2Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error) {
bktList, err := l.listBuckets(ctx, nil)
if err != nil {
return nil, err
}
@@ -294,8 +294,8 @@ func (l *b2Objects) ListBuckets() ([]minio.BucketInfo, error) {
}
// DeleteBucket deletes a bucket on B2
func (l *b2Objects) DeleteBucket(bucket string) error {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string) error {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
@@ -304,8 +304,8 @@ func (l *b2Objects) DeleteBucket(bucket string) error {
}
// ListObjects lists all objects in B2 bucket filtered by prefix, returns upto at max 1000 entries at a time.
func (l *b2Objects) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return loi, err
}
@@ -335,10 +335,10 @@ func (l *b2Objects) ListObjects(bucket string, prefix string, marker string, del
}
// ListObjectsV2 lists all objects in B2 bucket filtered by prefix, returns upto max 1000 entries at a time.
func (l *b2Objects) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int,
func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int,
fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) {
// fetchOwner, startAfter are not supported and unused.
bkt, err := l.Bucket(bucket)
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return loi, err
}
@@ -374,8 +374,8 @@ func (l *b2Objects) ListObjectsV2(bucket, prefix, continuationToken, delimiter s
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *b2Objects) GetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) GetObject(ctx context.Context, bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
@@ -389,8 +389,8 @@ func (l *b2Objects) GetObject(bucket string, object string, startOffset int64, l
}
// GetObjectInfo reads object info and replies back ObjectInfo
func (l *b2Objects) GetObjectInfo(bucket string, object string) (objInfo minio.ObjectInfo, err error) {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) GetObjectInfo(ctx context.Context, bucket string, object string) (objInfo minio.ObjectInfo, err error) {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return objInfo, err
}
@@ -478,8 +478,8 @@ func (nb *Reader) Read(p []byte) (int, error) {
}
// PutObject uploads the single upload to B2 backend by using *b2_upload_file* API, uploads upto 5GiB.
func (l *b2Objects) PutObject(bucket string, object string, data *h2.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) PutObject(ctx context.Context, bucket string, object string, data *h2.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return objInfo, err
}
@@ -517,8 +517,8 @@ func (l *b2Objects) PutObject(bucket string, object string, data *h2.Reader, met
}
// DeleteObject deletes a blob in bucket
func (l *b2Objects) DeleteObject(bucket string, object string) error {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) DeleteObject(ctx context.Context, bucket string, object string) error {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
@@ -533,11 +533,11 @@ func (l *b2Objects) DeleteObject(bucket string, object string) error {
}
// ListMultipartUploads lists all multipart uploads.
func (l *b2Objects) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string,
func (l *b2Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string,
delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
// keyMarker, prefix, delimiter are all ignored, Backblaze B2 doesn't support any
// of these parameters only equivalent parameter is uploadIDMarker.
bkt, err := l.Bucket(bucket)
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return lmi, err
}
@@ -572,9 +572,9 @@ func (l *b2Objects) ListMultipartUploads(bucket string, prefix string, keyMarker
// Each large file must consist of at least 2 parts, and all of the parts except the
// last one must be at least 5MB in size. The last part must contain at least one byte.
// For more information - https://www.backblaze.com/b2/docs/large_files.html
func (l *b2Objects) NewMultipartUpload(bucket string, object string, metadata map[string]string) (string, error) {
func (l *b2Objects) NewMultipartUpload(ctx context.Context, bucket string, object string, metadata map[string]string) (string, error) {
var uploadID string
bkt, err := l.Bucket(bucket)
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return uploadID, err
}
@@ -590,8 +590,8 @@ func (l *b2Objects) NewMultipartUpload(bucket string, object string, metadata ma
}
// PutObjectPart puts a part of object in bucket, uses B2's LargeFile upload API.
func (l *b2Objects) PutObjectPart(bucket string, object string, uploadID string, partID int, data *h2.Reader) (pi minio.PartInfo, err error) {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, data *h2.Reader) (pi minio.PartInfo, err error) {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return pi, err
}
@@ -616,8 +616,8 @@ func (l *b2Objects) PutObjectPart(bucket string, object string, uploadID string,
}
// ListObjectParts returns all object parts for specified object in specified bucket, uses B2's LargeFile upload API.
func (l *b2Objects) ListObjectParts(bucket string, object string, uploadID string, partNumberMarker int, maxParts int) (lpi minio.ListPartsInfo, err error) {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int) (lpi minio.ListPartsInfo, err error) {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return lpi, err
}
@@ -649,8 +649,8 @@ func (l *b2Objects) ListObjectParts(bucket string, object string, uploadID strin
}
// AbortMultipartUpload aborts a on going multipart upload, uses B2's LargeFile upload API.
func (l *b2Objects) AbortMultipartUpload(bucket string, object string, uploadID string) error {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
@@ -659,8 +659,8 @@ func (l *b2Objects) AbortMultipartUpload(bucket string, object string, uploadID
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object, uses B2's LargeFile upload API.
func (l *b2Objects) CompleteMultipartUpload(bucket string, object string, uploadID string, uploadedParts []minio.CompletePart) (oi minio.ObjectInfo, err error) {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart) (oi minio.ObjectInfo, err error) {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return oi, err
}
@@ -680,14 +680,14 @@ func (l *b2Objects) CompleteMultipartUpload(bucket string, object string, upload
return oi, b2ToObjectError(errors.Trace(err), bucket, object, uploadID)
}
return l.GetObjectInfo(bucket, object)
return l.GetObjectInfo(ctx, bucket, object)
}
// SetBucketPolicy - B2 supports 2 types of bucket policies:
// bucketType.AllPublic - bucketTypeReadOnly means that anybody can download the files is the bucket;
// bucketType.AllPrivate - bucketTypePrivate means that you need an authorization token to download them.
// Default is AllPrivate for all buckets.
func (l *b2Objects) SetBucketPolicy(bucket string, policyInfo policy.BucketAccessPolicy) error {
func (l *b2Objects) SetBucketPolicy(ctx context.Context, bucket string, policyInfo policy.BucketAccessPolicy) error {
var policies []minio.BucketAccessPolicy
for prefix, policy := range policy.GetPolicies(policyInfo.Statements, bucket, "") {
@@ -706,7 +706,7 @@ func (l *b2Objects) SetBucketPolicy(bucket string, policyInfo policy.BucketAcces
if policies[0].Policy != policy.BucketPolicyReadOnly {
return errors.Trace(minio.NotImplemented{})
}
bkt, err := l.Bucket(bucket)
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}
@@ -717,9 +717,9 @@ func (l *b2Objects) SetBucketPolicy(bucket string, policyInfo policy.BucketAcces
// GetBucketPolicy, returns the current bucketType from B2 backend and convert
// it into S3 compatible bucket policy info.
func (l *b2Objects) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, error) {
func (l *b2Objects) GetBucketPolicy(ctx context.Context, bucket string) (policy.BucketAccessPolicy, error) {
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
bkt, err := l.Bucket(bucket)
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return policyInfo, err
}
@@ -734,8 +734,8 @@ func (l *b2Objects) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, e
}
// DeleteBucketPolicy - resets the bucketType of bucket on B2 to 'allPrivate'.
func (l *b2Objects) DeleteBucketPolicy(bucket string) error {
bkt, err := l.Bucket(bucket)
func (l *b2Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error {
bkt, err := l.Bucket(ctx, bucket)
if err != nil {
return err
}