mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
Introduce simpler GetMultipartInfo call for performance (#9722)
Advantages avoids 100's of stats which are needed for each upload operation in FS/NAS gateway mode when uploading a large multipart object, dramatically increases performance for multipart uploads by avoiding recursive calls. For other gateway's simplifies the approach since azure, gcs, hdfs gateway's don't capture any specific metadata during upload which needs handler validation for encryption/compression. Erasure coding was already optimized, additionally just avoids small allocations of large data structure. Fixes #7206
This commit is contained in:
parent
231c5cf6de
commit
b330c2c57e
@ -350,6 +350,50 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
|
||||
// by callers to verify object states
|
||||
// - encrypted
|
||||
// - compressed
|
||||
func (fs *FSObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
||||
minfo := MultipartInfo{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
UploadID: uploadID,
|
||||
}
|
||||
|
||||
if err := checkListPartsArgs(ctx, bucket, object, fs); err != nil {
|
||||
return minfo, toObjectErr(err)
|
||||
}
|
||||
|
||||
// Check if bucket exists
|
||||
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
|
||||
return minfo, toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
|
||||
if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil {
|
||||
if err == errFileNotFound || err == errFileAccessDenied {
|
||||
return minfo, InvalidUploadID{UploadID: uploadID}
|
||||
}
|
||||
return minfo, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
fsMetaBytes, err := ioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile))
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return minfo, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
var fsMeta fsMetaV1
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
if err = json.Unmarshal(fsMetaBytes, &fsMeta); err != nil {
|
||||
return minfo, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
minfo.UserDefined = fsMeta.Meta
|
||||
return minfo, nil
|
||||
}
|
||||
|
||||
// ListObjectParts - lists all previously uploaded parts for a given
|
||||
// object and uploadID. Takes additional input of part-number-marker
|
||||
// to indicate where the listing should begin from.
|
||||
|
@ -130,7 +130,6 @@ func FromMinioClientListPartsInfo(lopr minio.ListObjectPartsResult) ListPartsInf
|
||||
NextPartNumberMarker: lopr.NextPartNumberMarker,
|
||||
MaxParts: lopr.MaxParts,
|
||||
IsTruncated: lopr.IsTruncated,
|
||||
EncodingType: lopr.EncodingType,
|
||||
Parts: fromMinioClientObjectParts(lopr.ObjectParts),
|
||||
}
|
||||
}
|
||||
|
@ -83,6 +83,12 @@ func (a GatewayUnsupported) PutObjectPart(ctx context.Context, bucket string, ob
|
||||
return pi, NotImplemented{}
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns metadata associated with the uploadId
|
||||
func (a GatewayUnsupported) GetMultipartInfo(ctx context.Context, bucket string, object string, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return MultipartInfo{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListObjectParts returns all object parts for specified object in specified bucket
|
||||
func (a GatewayUnsupported) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (lpi ListPartsInfo, err error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
|
@ -1092,6 +1092,18 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart info of the uploadId of the object
|
||||
func (a *azureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
|
||||
if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListObjectParts - Use Azure equivalent `ContainerURL.ListBlobsHierarchySegment`.
|
||||
func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) {
|
||||
if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
|
@ -1152,6 +1152,14 @@ func gcsGetPartInfo(ctx context.Context, attrs *storage.ObjectAttrs) (minio.Part
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart info of the uploadId of the object
|
||||
func (l *gcsGateway) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListObjectParts returns all object parts for specified object in specified bucket
|
||||
func (l *gcsGateway) ListObjectParts(ctx context.Context, bucket string, key string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (minio.ListPartsInfo, error) {
|
||||
it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{
|
||||
|
@ -634,6 +634,23 @@ func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, u
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart info of the uploadId of the object
|
||||
func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
|
||||
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
|
||||
if err != nil {
|
||||
return result, hdfsToObjectErr(ctx, err, bucket)
|
||||
}
|
||||
|
||||
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) {
|
||||
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
|
||||
if err != nil {
|
||||
|
@ -528,6 +528,21 @@ func (l *s3EncObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
|
||||
return l.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart info of the uploadId of the object
|
||||
func (l *s3EncObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
|
||||
// there could be situations of dare.meta getting corrupted by competing upload parts.
|
||||
dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
|
||||
if err != nil {
|
||||
return l.s3Objects.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
}
|
||||
result.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListObjectParts returns all object parts for specified object in specified bucket
|
||||
func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) {
|
||||
// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
|
||||
|
@ -612,14 +612,40 @@ func (l *s3Objects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, de
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart info of the uploadId of the object
|
||||
func (l *s3Objects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
|
||||
result.Bucket = bucket
|
||||
result.Object = object
|
||||
result.UploadID = uploadID
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListObjectParts returns all object parts for specified object in specified bucket
|
||||
func (l *s3Objects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) {
|
||||
result, err := l.Client.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
|
||||
if err != nil {
|
||||
return lpi, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
return lpi, err
|
||||
}
|
||||
lpi = minio.FromMinioClientListPartsInfo(result)
|
||||
if lpi.IsTruncated && maxParts > len(lpi.Parts) {
|
||||
partNumberMarker = lpi.NextPartNumberMarker
|
||||
for {
|
||||
result, err = l.Client.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
|
||||
if err != nil {
|
||||
return lpi, err
|
||||
}
|
||||
|
||||
return minio.FromMinioClientListPartsInfo(result), nil
|
||||
nlpi := minio.FromMinioClientListPartsInfo(result)
|
||||
|
||||
partNumberMarker = nlpi.NextPartNumberMarker
|
||||
|
||||
lpi.Parts = append(lpi.Parts, nlpi.Parts...)
|
||||
if !nlpi.IsTruncated {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return lpi, nil
|
||||
}
|
||||
|
||||
// AbortMultipartUpload aborts a ongoing multipart upload
|
||||
|
@ -194,6 +194,7 @@ type ObjectInfo struct {
|
||||
PutObjReader *PutObjReader `json:"-"`
|
||||
|
||||
metadataOnly bool
|
||||
keyRotation bool
|
||||
|
||||
// Date and time when the object was last accessed.
|
||||
AccTime time.Time
|
||||
@ -202,6 +203,28 @@ type ObjectInfo struct {
|
||||
backendType BackendType
|
||||
}
|
||||
|
||||
// MultipartInfo captures metadata information about the uploadId
|
||||
// this data structure is used primarily for some internal purposes
|
||||
// for verifying upload type such as was the upload
|
||||
// - encrypted
|
||||
// - compressed
|
||||
type MultipartInfo struct {
|
||||
// Name of the bucket.
|
||||
Bucket string
|
||||
|
||||
// Name of the object.
|
||||
Object string
|
||||
|
||||
// Upload ID identifying the multipart upload whose parts are being listed.
|
||||
UploadID string
|
||||
|
||||
// Date and time at which the multipart upload was initiated.
|
||||
Initiated time.Time
|
||||
|
||||
// Any metadata set during InitMultipartUpload, including encryption headers.
|
||||
UserDefined map[string]string
|
||||
}
|
||||
|
||||
// ListPartsInfo - represents list of all parts.
|
||||
type ListPartsInfo struct {
|
||||
// Name of the bucket.
|
||||
@ -235,8 +258,6 @@ type ListPartsInfo struct {
|
||||
|
||||
// Any metadata set during InitMultipartUpload, including encryption headers.
|
||||
UserDefined map[string]string
|
||||
|
||||
EncodingType string // Not supported yet.
|
||||
}
|
||||
|
||||
// Lookup - returns if uploadID is valid
|
||||
@ -362,20 +383,6 @@ type PartInfo struct {
|
||||
ActualSize int64
|
||||
}
|
||||
|
||||
// MultipartInfo - represents metadata in progress multipart upload.
|
||||
type MultipartInfo struct {
|
||||
// Object name for which the multipart upload was initiated.
|
||||
Object string
|
||||
|
||||
// Unique identifier for this multipart upload.
|
||||
UploadID string
|
||||
|
||||
// Date and time at which the multipart upload was initiated.
|
||||
Initiated time.Time
|
||||
|
||||
StorageClass string // Not supported yet.
|
||||
}
|
||||
|
||||
// CompletePart - represents the part that was completed, this is sent by the client
|
||||
// during CompleteMultipartUpload request.
|
||||
type CompletePart struct {
|
||||
|
@ -92,6 +92,7 @@ type ObjectLayer interface {
|
||||
CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int,
|
||||
startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (info PartInfo, err error)
|
||||
PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error)
|
||||
GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (info MultipartInfo, err error)
|
||||
ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error)
|
||||
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error
|
||||
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
|
@ -1008,12 +1008,13 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
|
||||
// Since we are rotating the keys, make sure to update the metadata.
|
||||
srcInfo.metadataOnly = true
|
||||
srcInfo.keyRotation = true
|
||||
} else {
|
||||
if isSourceEncrypted || isTargetEncrypted {
|
||||
// We are not only copying just metadata instead
|
||||
// we are creating a new object at this point, even
|
||||
// if source and destination are same objects.
|
||||
if !srcInfo.metadataOnly {
|
||||
if !srcInfo.keyRotation {
|
||||
srcInfo.metadataOnly = false
|
||||
}
|
||||
}
|
||||
@ -1858,15 +1859,14 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
actualPartSize = length
|
||||
var reader io.Reader
|
||||
|
||||
var li ListPartsInfo
|
||||
li, err = objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1, dstOpts)
|
||||
mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
// Read compression metadata preserved in the init multipart for the decision.
|
||||
_, isCompressed := li.UserDefined[ReservedMetadataPrefix+"compression"]
|
||||
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
|
||||
// Compress only if the compression is enabled during initial multipart.
|
||||
if isCompressed {
|
||||
s2c := newS2CompressReader(gr)
|
||||
@ -1883,7 +1883,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
|
||||
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, li.UserDefined)
|
||||
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, mi.UserDefined)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
@ -1892,14 +1892,14 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
rawReader := srcInfo.Reader
|
||||
pReader := NewPutObjReader(rawReader, nil, nil)
|
||||
|
||||
isEncrypted := crypto.IsEncrypted(li.UserDefined)
|
||||
isEncrypted := crypto.IsEncrypted(mi.UserDefined)
|
||||
var objectEncryptionKey crypto.ObjectKey
|
||||
if objectAPI.IsEncryptionSupported() && !isCompressed && isEncrypted {
|
||||
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) {
|
||||
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
if crypto.S3.IsEncrypted(li.UserDefined) && crypto.SSEC.IsRequested(r.Header) {
|
||||
if crypto.S3.IsEncrypted(mi.UserDefined) && crypto.SSEC.IsRequested(r.Header) {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
@ -1911,7 +1911,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
}
|
||||
key, err = decryptObjectInfo(key, dstBucket, dstObject, li.UserDefined)
|
||||
key, err = decryptObjectInfo(key, dstBucket, dstObject, mi.UserDefined)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
@ -2094,15 +2094,15 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
return
|
||||
}
|
||||
}
|
||||
var li ListPartsInfo
|
||||
li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1, opts)
|
||||
|
||||
mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
// Read compression metadata preserved in the init multipart for the decision.
|
||||
_, isCompressed := li.UserDefined[ReservedMetadataPrefix+"compression"]
|
||||
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
|
||||
|
||||
if objectAPI.IsCompressionSupported() && isCompressed {
|
||||
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize, globalCLIContext.StrictS3Compat)
|
||||
@ -2128,15 +2128,15 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
rawReader := hashReader
|
||||
pReader := NewPutObjReader(rawReader, nil, nil)
|
||||
|
||||
isEncrypted := crypto.IsEncrypted(li.UserDefined)
|
||||
isEncrypted := crypto.IsEncrypted(mi.UserDefined)
|
||||
var objectEncryptionKey crypto.ObjectKey
|
||||
if objectAPI.IsEncryptionSupported() && !isCompressed && isEncrypted {
|
||||
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) {
|
||||
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
opts, err = putOpts(ctx, r, bucket, object, li.UserDefined)
|
||||
opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
@ -2152,7 +2152,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
// Calculating object encryption key
|
||||
key, err = decryptObjectInfo(key, bucket, object, li.UserDefined)
|
||||
key, err = decryptObjectInfo(key, bucket, object, mi.UserDefined)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
@ -2438,19 +2438,18 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
var opts ObjectOptions
|
||||
var isEncrypted, ssec bool
|
||||
if objectAPI.IsEncryptionSupported() {
|
||||
var li ListPartsInfo
|
||||
li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1, opts)
|
||||
mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
if crypto.IsEncrypted(li.UserDefined) {
|
||||
if crypto.IsEncrypted(mi.UserDefined) {
|
||||
var key []byte
|
||||
isEncrypted = true
|
||||
ssec = crypto.SSEC.IsEncrypted(li.UserDefined)
|
||||
if crypto.S3.IsEncrypted(li.UserDefined) {
|
||||
ssec = crypto.SSEC.IsEncrypted(mi.UserDefined)
|
||||
if crypto.S3.IsEncrypted(mi.UserDefined) {
|
||||
// Calculating object encryption key
|
||||
objectEncryptionKey, err = decryptObjectInfo(key, bucket, object, li.UserDefined)
|
||||
objectEncryptionKey, err = decryptObjectInfo(key, bucket, object, mi.UserDefined)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
@ -2461,21 +2460,14 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
|
||||
partsMap := make(map[string]PartInfo)
|
||||
if isEncrypted {
|
||||
var partNumberMarker int
|
||||
maxParts := 1000
|
||||
for {
|
||||
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
for _, part := range listPartsInfo.Parts {
|
||||
partsMap[strconv.Itoa(part.PartNumber)] = part
|
||||
}
|
||||
partNumberMarker = listPartsInfo.NextPartNumberMarker
|
||||
if !listPartsInfo.IsTruncated {
|
||||
break
|
||||
}
|
||||
maxParts := 10000
|
||||
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, maxParts, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
for _, part := range listPartsInfo.Parts {
|
||||
partsMap[strconv.Itoa(part.PartNumber)] = part
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1169,6 +1169,11 @@ func (s *xlSets) PutObjectPart(ctx context.Context, bucket, object, uploadID str
|
||||
return s.getHashedSet(object).PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
||||
}
|
||||
|
||||
// GetMultipartInfo - return multipart metadata info uploaded at hashedSet.
|
||||
func (s *xlSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) {
|
||||
return s.getHashedSet(object).GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
}
|
||||
|
||||
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
|
||||
func (s *xlSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
||||
return s.getHashedSet(object).ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
||||
|
@ -412,6 +412,51 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
|
||||
// by callers to verify object states
|
||||
// - encrypted
|
||||
// - compressed
|
||||
func (xl xlObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
||||
result := MultipartInfo{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
UploadID: uploadID,
|
||||
}
|
||||
|
||||
if err := xl.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return result, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
storageDisks := xl.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllXLMetadata(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errXLWriteQuorum {
|
||||
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime, writeQuorum)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.UserDefined = xlMeta.Meta
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListObjectParts - lists all previously uploaded parts for a given
|
||||
// object and uploadID. Takes additional input of part-number-marker
|
||||
// to indicate where the listing should begin from.
|
||||
|
@ -1046,13 +1046,17 @@ func (z *xlZones) PutObjectPart(ctx context.Context, bucket, object, uploadID st
|
||||
return z.zones[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
||||
}
|
||||
for _, zone := range z.zones {
|
||||
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
|
||||
if err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
if result.Lookup(uploadID) {
|
||||
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
if err == nil {
|
||||
return zone.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
||||
}
|
||||
switch err.(type) {
|
||||
case InvalidUploadID:
|
||||
// Look for information on the next zone
|
||||
continue
|
||||
}
|
||||
// Any other unhandled errors such as quorum return.
|
||||
return PartInfo{}, err
|
||||
}
|
||||
|
||||
return PartInfo{}, InvalidUploadID{
|
||||
@ -1062,6 +1066,41 @@ func (z *xlZones) PutObjectPart(ctx context.Context, bucket, object, uploadID st
|
||||
}
|
||||
}
|
||||
|
||||
func (z *xlZones) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
||||
if err := checkListPartsArgs(ctx, bucket, object, z); err != nil {
|
||||
return MultipartInfo{}, err
|
||||
}
|
||||
|
||||
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
|
||||
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
return MultipartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock()
|
||||
|
||||
if z.SingleZone() {
|
||||
return z.zones[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
}
|
||||
for _, zone := range z.zones {
|
||||
mi, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
if err == nil {
|
||||
return mi, nil
|
||||
}
|
||||
switch err.(type) {
|
||||
case InvalidUploadID:
|
||||
// upload id not found, continue to the next zone.
|
||||
continue
|
||||
}
|
||||
// any other unhandled error return right here.
|
||||
return MultipartInfo{}, err
|
||||
}
|
||||
return MultipartInfo{}, InvalidUploadID{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
UploadID: uploadID,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
|
||||
func (z *xlZones) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (ListPartsInfo, error) {
|
||||
if err := checkListPartsArgs(ctx, bucket, object, z); err != nil {
|
||||
@ -1078,13 +1117,15 @@ func (z *xlZones) ListObjectParts(ctx context.Context, bucket, object, uploadID
|
||||
return z.zones[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
||||
}
|
||||
for _, zone := range z.zones {
|
||||
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
|
||||
if err != nil {
|
||||
return ListPartsInfo{}, err
|
||||
}
|
||||
if result.Lookup(uploadID) {
|
||||
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||
if err == nil {
|
||||
return zone.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
||||
}
|
||||
switch err.(type) {
|
||||
case InvalidUploadID:
|
||||
continue
|
||||
}
|
||||
return ListPartsInfo{}, err
|
||||
}
|
||||
return ListPartsInfo{}, InvalidUploadID{
|
||||
Bucket: bucket,
|
||||
@ -1110,13 +1151,16 @@ func (z *xlZones) AbortMultipartUpload(ctx context.Context, bucket, object, uplo
|
||||
}
|
||||
|
||||
for _, zone := range z.zones {
|
||||
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if result.Lookup(uploadID) {
|
||||
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{})
|
||||
if err == nil {
|
||||
return zone.AbortMultipartUpload(ctx, bucket, object, uploadID)
|
||||
}
|
||||
switch err.(type) {
|
||||
case InvalidUploadID:
|
||||
// upload id not found move to next zone
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return InvalidUploadID{
|
||||
Bucket: bucket,
|
||||
|
Loading…
x
Reference in New Issue
Block a user