Add context to the object-interface methods.

Make necessary changes to xl fs azure sia
This commit is contained in:
Krishna Srinivas
2018-03-14 12:01:47 -07:00
committed by kannappanr
parent 9083bc152e
commit e452377b24
58 changed files with 844 additions and 610 deletions

View File

@@ -17,6 +17,7 @@
package cmd
import (
"context"
"fmt"
"hash/crc32"
"io"
@@ -241,11 +242,11 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
}
// StorageInfo - combines output of StorageInfo across all erasure coded object sets.
func (s *xlSets) StorageInfo() StorageInfo {
func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
var storageInfo StorageInfo
storageInfo.Backend.Type = Erasure
for _, set := range s.sets {
lstorageInfo := set.StorageInfo()
lstorageInfo := set.StorageInfo(ctx)
storageInfo.Total = storageInfo.Total + lstorageInfo.Total
storageInfo.Free = storageInfo.Free + lstorageInfo.Free
storageInfo.Backend.OnlineDisks = storageInfo.Backend.OnlineDisks + lstorageInfo.Backend.OnlineDisks
@@ -310,13 +311,13 @@ func (s *xlSets) StorageInfo() StorageInfo {
// Shutdown shutsdown all erasure coded sets in parallel
// returns error upon first error.
func (s *xlSets) Shutdown() error {
func (s *xlSets) Shutdown(ctx context.Context) error {
g := errgroup.WithNErrs(len(s.sets))
for index := range s.sets {
index := index
g.Go(func() error {
return s.sets[index].Shutdown()
return s.sets[index].Shutdown(ctx)
}, index)
}
@@ -332,14 +333,14 @@ func (s *xlSets) Shutdown() error {
// MakeBucketLocation - creates a new bucket across all sets simultaneously
// even if one of the sets fail to create buckets, we proceed to undo a
// successful operation.
func (s *xlSets) MakeBucketWithLocation(bucket, location string) error {
func (s *xlSets) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
g := errgroup.WithNErrs(len(s.sets))
// Create buckets in parallel across all sets.
for index := range s.sets {
index := index
g.Go(func() error {
return s.sets[index].MakeBucketWithLocation(bucket, location)
return s.sets[index].MakeBucketWithLocation(ctx, bucket, location)
}, index)
}
@@ -367,7 +368,7 @@ func undoMakeBucketSets(bucket string, sets []*xlObjects, errs []error) {
index := index
if errs[index] == nil {
g.Go(func() error {
return sets[index].DeleteBucket(bucket)
return sets[index].DeleteBucket(nil, bucket)
}, index)
}
}
@@ -403,13 +404,13 @@ func (s *xlSets) getHashedSet(input string) (set *xlObjects) {
}
// GetBucketInfo - returns bucket info from one of the erasure coded set.
func (s *xlSets) GetBucketInfo(bucket string) (bucketInfo BucketInfo, err error) {
return s.getHashedSet(bucket).GetBucketInfo(bucket)
func (s *xlSets) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
return s.getHashedSet(bucket).GetBucketInfo(ctx, bucket)
}
// ListObjectsV2 lists all objects in bucket filtered by prefix
func (s *xlSets) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
loi, err := s.ListObjects(bucket, prefix, continuationToken, delimiter, maxKeys)
func (s *xlSets) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
loi, err := s.ListObjects(ctx, bucket, prefix, continuationToken, delimiter, maxKeys)
if err != nil {
return result, err
}
@@ -425,12 +426,12 @@ func (s *xlSets) ListObjectsV2(bucket, prefix, continuationToken, delimiter stri
}
// SetBucketPolicy persist the new policy on the bucket.
func (s *xlSets) SetBucketPolicy(bucket string, policy policy.BucketAccessPolicy) error {
func (s *xlSets) SetBucketPolicy(ctx context.Context, bucket string, policy policy.BucketAccessPolicy) error {
return persistAndNotifyBucketPolicyChange(bucket, false, policy, s)
}
// GetBucketPolicy will return a policy on a bucket
func (s *xlSets) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, error) {
func (s *xlSets) GetBucketPolicy(ctx context.Context, bucket string) (policy.BucketAccessPolicy, error) {
// fetch bucket policy from cache.
bpolicy := s.bucketPolicies.GetBucketPolicy(bucket)
if reflect.DeepEqual(bpolicy, emptyBucketPolicy) {
@@ -440,12 +441,12 @@ func (s *xlSets) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, erro
}
// DeleteBucketPolicy deletes all policies on bucket
func (s *xlSets) DeleteBucketPolicy(bucket string) error {
func (s *xlSets) DeleteBucketPolicy(ctx context.Context, bucket string) error {
return persistAndNotifyBucketPolicyChange(bucket, true, emptyBucketPolicy, s)
}
// RefreshBucketPolicy refreshes policy cache from disk
func (s *xlSets) RefreshBucketPolicy(bucket string) error {
func (s *xlSets) RefreshBucketPolicy(ctx context.Context, bucket string) error {
policy, err := ReadBucketPolicy(bucket, s)
if err != nil {
if reflect.DeepEqual(policy, emptyBucketPolicy) {
@@ -469,14 +470,14 @@ func (s *xlSets) IsEncryptionSupported() bool {
// DeleteBucket - deletes a bucket on all sets simultaneously,
// even if one of the sets fail to delete buckets, we proceed to
// undo a successful operation.
func (s *xlSets) DeleteBucket(bucket string) error {
func (s *xlSets) DeleteBucket(ctx context.Context, bucket string) error {
g := errgroup.WithNErrs(len(s.sets))
// Delete buckets in parallel across all sets.
for index := range s.sets {
index := index
g.Go(func() error {
return s.sets[index].DeleteBucket(bucket)
return s.sets[index].DeleteBucket(ctx, bucket)
}, index)
}
@@ -508,7 +509,7 @@ func undoDeleteBucketSets(bucket string, sets []*xlObjects, errs []error) {
index := index
if errs[index] == nil {
g.Go(func() error {
return sets[index].MakeBucketWithLocation(bucket, "")
return sets[index].MakeBucketWithLocation(nil, bucket, "")
}, index)
}
}
@@ -519,42 +520,42 @@ func undoDeleteBucketSets(bucket string, sets []*xlObjects, errs []error) {
// List all buckets from one of the set, we are not doing merge
// sort here just for simplification. As per design it is assumed
// that all buckets are present on all sets.
func (s *xlSets) ListBuckets() (buckets []BucketInfo, err error) {
func (s *xlSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
// Always lists from the same set signified by the empty string.
return s.getHashedSet("").ListBuckets()
return s.getHashedSet("").ListBuckets(ctx)
}
// --- Object Operations ---
// GetObject - reads an object from the hashedSet based on the object name.
func (s *xlSets) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error {
return s.getHashedSet(object).GetObject(bucket, object, startOffset, length, writer, etag)
func (s *xlSets) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error {
return s.getHashedSet(object).GetObject(ctx, bucket, object, startOffset, length, writer, etag)
}
// PutObject - writes an object to hashedSet based on the object name.
func (s *xlSets) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
return s.getHashedSet(object).PutObject(bucket, object, data, metadata)
func (s *xlSets) PutObject(ctx context.Context, bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
return s.getHashedSet(object).PutObject(ctx, bucket, object, data, metadata)
}
// GetObjectInfo - reads object metadata from the hashedSet based on the object name.
func (s *xlSets) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) {
return s.getHashedSet(object).GetObjectInfo(bucket, object)
func (s *xlSets) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) {
return s.getHashedSet(object).GetObjectInfo(ctx, bucket, object)
}
// DeleteObject - deletes an object from the hashedSet based on the object name.
func (s *xlSets) DeleteObject(bucket string, object string) (err error) {
return s.getHashedSet(object).DeleteObject(bucket, object)
func (s *xlSets) DeleteObject(ctx context.Context, bucket string, object string) (err error) {
return s.getHashedSet(object).DeleteObject(ctx, bucket, object)
}
// CopyObject - copies objects from one hashedSet to another hashedSet, on server side.
func (s *xlSets) CopyObject(srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo) (objInfo ObjectInfo, err error) {
func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo) (objInfo ObjectInfo, err error) {
srcSet := s.getHashedSet(srcObject)
destSet := s.getHashedSet(destObject)
// Check if this request is only metadata update.
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
if cpSrcDstSame && srcInfo.metadataOnly {
return srcSet.CopyObject(srcBucket, srcObject, destBucket, destObject, srcInfo)
return srcSet.CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo)
}
// Hold write lock on destination since in both cases
@@ -672,7 +673,7 @@ func listDirSetsFactory(isLeaf isLeafFunc, treeWalkIgnoredErrs []error, sets ...
// ListObjects - implements listing of objects across sets, each set is independently
// listed and subsequently merge lexically sorted inside listDirSetsFactory(). Resulting
// value through the walk channel receives the data properly lexically sorted.
func (s *xlSets) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
// validate all the inputs for listObjects
if err = checkListObjsArgs(bucket, prefix, marker, delimiter, s); err != nil {
return result, err
@@ -766,26 +767,26 @@ func (s *xlSets) ListObjects(bucket, prefix, marker, delimiter string, maxKeys i
return result, nil
}
func (s *xlSets) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
// In list multipart uploads we are going to treat input prefix as the object,
// this means that we are not supporting directory navigation.
return s.getHashedSet(prefix).ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
return s.getHashedSet(prefix).ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
// Initiate a new multipart upload on a hashedSet based on object name.
func (s *xlSets) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error) {
return s.getHashedSet(object).NewMultipartUpload(bucket, object, metadata)
func (s *xlSets) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
return s.getHashedSet(object).NewMultipartUpload(ctx, bucket, object, metadata)
}
// Copies a part of an object from source hashedSet to destination hashedSet.
func (s *xlSets) CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int,
func (s *xlSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int,
startOffset int64, length int64, srcInfo ObjectInfo) (partInfo PartInfo, err error) {
srcSet := s.getHashedSet(srcObject)
destSet := s.getHashedSet(destObject)
go func() {
if gerr := srcSet.GetObject(srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag); gerr != nil {
if gerr := srcSet.GetObject(ctx, srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag); gerr != nil {
if gerr = srcInfo.Writer.Close(); gerr != nil {
errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject)
return
@@ -797,27 +798,27 @@ func (s *xlSets) CopyObjectPart(srcBucket, srcObject, destBucket, destObject str
}
}()
return destSet.PutObjectPart(destBucket, destObject, uploadID, partID, srcInfo.Reader)
return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.Reader)
}
// PutObjectPart - writes part of an object to hashedSet based on the object name.
func (s *xlSets) PutObjectPart(bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error) {
return s.getHashedSet(object).PutObjectPart(bucket, object, uploadID, partID, data)
func (s *xlSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error) {
return s.getHashedSet(object).PutObjectPart(ctx, bucket, object, uploadID, partID, data)
}
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
func (s *xlSets) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) {
return s.getHashedSet(object).ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
func (s *xlSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) {
return s.getHashedSet(object).ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts)
}
// Aborts an in-progress multipart operation on hashedSet based on the object name.
func (s *xlSets) AbortMultipartUpload(bucket, object, uploadID string) error {
return s.getHashedSet(object).AbortMultipartUpload(bucket, object, uploadID)
func (s *xlSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
return s.getHashedSet(object).AbortMultipartUpload(ctx, bucket, object, uploadID)
}
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
func (s *xlSets) CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error) {
return s.getHashedSet(object).CompleteMultipartUpload(bucket, object, uploadID, uploadedParts)
func (s *xlSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error) {
return s.getHashedSet(object).CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts)
}
/*
@@ -911,7 +912,7 @@ func formatsToDrivesInfo(endpoints EndpointList, formats []*formatXLV3, sErrs []
// HealFormat - heals missing `format.json` on freshly or corrupted
// disks (missing format.json but does have erasure coded data in it).
func (s *xlSets) HealFormat(dryRun bool) (madmin.HealResultItem, error) {
func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
// Acquire lock on format.json
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile)
if err := formatLock.GetLock(globalHealingTimeout); err != nil {
@@ -1041,7 +1042,7 @@ func (s *xlSets) HealFormat(dryRun bool) (madmin.HealResultItem, error) {
}
// HealBucket - heals inconsistent buckets and bucket metadata on all sets.
func (s *xlSets) HealBucket(bucket string, dryRun bool) (results []madmin.HealResultItem, err error) {
func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun bool) (results []madmin.HealResultItem, err error) {
// Initialize heal result info
res := madmin.HealResultItem{
Type: madmin.HealItemBucket,
@@ -1052,7 +1053,7 @@ func (s *xlSets) HealBucket(bucket string, dryRun bool) (results []madmin.HealRe
for _, s := range s.sets {
var setResults []madmin.HealResultItem
setResults, _ = s.HealBucket(bucket, dryRun)
setResults, _ = s.HealBucket(ctx, bucket, dryRun)
for _, setResult := range setResults {
if setResult.Type == madmin.HealItemBucket {
for _, v := range setResult.Before.Drives {
@@ -1108,12 +1109,12 @@ func (s *xlSets) HealBucket(bucket string, dryRun bool) (results []madmin.HealRe
}
// HealObject - heals inconsistent object on a hashedSet based on object name.
func (s *xlSets) HealObject(bucket, object string, dryRun bool) (madmin.HealResultItem, error) {
return s.getHashedSet(object).HealObject(bucket, object, dryRun)
func (s *xlSets) HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error) {
return s.getHashedSet(object).HealObject(ctx, bucket, object, dryRun)
}
// Lists all buckets which need healing.
func (s *xlSets) ListBucketsHeal() ([]BucketInfo, error) {
func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
listBuckets := []BucketInfo{}
var healBuckets = map[string]BucketInfo{}
for _, set := range s.sets {
@@ -1308,7 +1309,7 @@ func (s *xlSets) listObjectsHeal(bucket, prefix, marker, delimiter string, maxKe
}
// This is not implemented yet, will be implemented later to comply with Admin API refactor.
func (s *xlSets) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
if err = checkListObjsArgs(bucket, prefix, marker, delimiter, s); err != nil {
return loi, err
}
@@ -1343,10 +1344,10 @@ func (s *xlSets) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKe
}
// ListLocks from all sets, aggregate them and return.
func (s *xlSets) ListLocks(bucket, prefix string, duration time.Duration) (lockInfo []VolumeLockInfo, err error) {
func (s *xlSets) ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) (lockInfo []VolumeLockInfo, err error) {
for _, set := range s.sets {
var setLockInfo []VolumeLockInfo
setLockInfo, err = set.ListLocks(bucket, prefix, duration)
setLockInfo, err = set.ListLocks(ctx, bucket, prefix, duration)
if err != nil {
return nil, err
}
@@ -1356,9 +1357,9 @@ func (s *xlSets) ListLocks(bucket, prefix string, duration time.Duration) (lockI
}
// Clear all requested locks on all sets.
func (s *xlSets) ClearLocks(lockInfo []VolumeLockInfo) error {
func (s *xlSets) ClearLocks(ctx context.Context, lockInfo []VolumeLockInfo) error {
for _, set := range s.sets {
set.ClearLocks(lockInfo)
set.ClearLocks(ctx, lockInfo)
}
return nil
}