mirror of
https://github.com/minio/minio.git
synced 2025-01-26 14:13:16 -05:00
obj: Return objectInfo for CompleteMultipartUpload(). (#3587)
This patch avoids doing GetObjectInfo() in similar way how we did for PutOject().
This commit is contained in:
parent
1c699d8d3f
commit
98a6a2bcab
@ -585,19 +585,19 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
|
||||
// md5sums of all the parts.
|
||||
//
|
||||
// Implements S3 compatible Complete multipart API.
|
||||
func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
|
||||
func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, error) {
|
||||
if err := checkCompleteMultipartArgs(bucket, object, fs); err != nil {
|
||||
return "", err
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
if _, err := fs.statBucketDir(bucket); err != nil {
|
||||
return "", toObjectErr(err, bucket)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
// Calculate s3 compatible md5sum for complete multipart.
|
||||
s3MD5, err := getCompleteMultipartMD5(parts)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
uploadIDPath := pathJoin(bucket, object, uploadID)
|
||||
@ -612,9 +612,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
rlk, err := fs.rwPool.Open(fsMetaPathMultipart)
|
||||
if err != nil {
|
||||
if err == errFileNotFound || err == errFileAccessDenied {
|
||||
return "", traceError(InvalidUploadID{UploadID: uploadID})
|
||||
return ObjectInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
return "", toObjectErr(traceError(err), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
|
||||
// Disallow any parallel abort or complete multipart operations.
|
||||
@ -622,9 +622,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
if err != nil {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
if err == errFileNotFound || err == errFileAccessDenied {
|
||||
return "", traceError(InvalidUploadID{UploadID: uploadID})
|
||||
return ObjectInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
return "", toObjectErr(traceError(err), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
defer rwlk.Close()
|
||||
|
||||
@ -633,7 +633,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
_, err = fsMeta.ReadFrom(io.NewSectionReader(rlk, 0, rlk.Size()))
|
||||
if err != nil {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPathMultipart)
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaMultipartBucket, fsMetaPathMultipart)
|
||||
}
|
||||
|
||||
// Wait for any competing PutObject() operation on bucket/object, since same namespace
|
||||
@ -642,10 +642,12 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
metaFile, err := fs.rwPool.Create(fsMetaPath)
|
||||
if err != nil {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", toObjectErr(traceError(err), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
defer metaFile.Close()
|
||||
|
||||
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
|
||||
|
||||
// This lock is held during rename of the appended tmp file to the actual
|
||||
// location so that any competing GetObject/PutObject/DeleteObject do not race.
|
||||
appendFallback := true // In case background-append did not append the required parts.
|
||||
@ -655,10 +657,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
if err == nil {
|
||||
appendFallback = false
|
||||
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID)
|
||||
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
|
||||
if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", toObjectErr(err, minioMetaTmpBucket, uploadID)
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, uploadID)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -681,18 +682,18 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
partIdx := fsMeta.ObjectPartIndex(part.PartNumber)
|
||||
if partIdx == -1 {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", traceError(InvalidPart{})
|
||||
return ObjectInfo{}, traceError(InvalidPart{})
|
||||
}
|
||||
|
||||
if fsMeta.Parts[partIdx].ETag != part.ETag {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", traceError(BadDigest{})
|
||||
return ObjectInfo{}, traceError(BadDigest{})
|
||||
}
|
||||
|
||||
// All parts except the last part has to be atleast 5MB.
|
||||
if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", traceError(PartTooSmall{
|
||||
return ObjectInfo{}, traceError(PartTooSmall{
|
||||
PartNumber: part.PartNumber,
|
||||
PartSize: fsMeta.Parts[partIdx].Size,
|
||||
PartETag: part.ETag,
|
||||
@ -709,9 +710,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
if err != nil {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
if err == errFileNotFound {
|
||||
return "", traceError(InvalidPart{})
|
||||
return ObjectInfo{}, traceError(InvalidPart{})
|
||||
}
|
||||
return "", toObjectErr(traceError(err), minioMetaMultipartBucket, partSuffix)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), minioMetaMultipartBucket, partSuffix)
|
||||
}
|
||||
|
||||
// No need to hold a lock, this is a unique file and will be only written
|
||||
@ -721,7 +722,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
if err != nil {
|
||||
reader.Close()
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", toObjectErr(traceError(err), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
|
||||
_, err = io.CopyBuffer(wfile, reader, buf)
|
||||
@ -729,17 +730,16 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
wfile.Close()
|
||||
reader.Close()
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", toObjectErr(traceError(err), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
|
||||
wfile.Close()
|
||||
reader.Close()
|
||||
}
|
||||
|
||||
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
|
||||
if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", toObjectErr(err, minioMetaTmpBucket, uploadID)
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, uploadID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -755,7 +755,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// Write all the set metadata.
|
||||
if _, err = fsMeta.WriteTo(metaFile); err != nil {
|
||||
fs.rwPool.Close(fsMetaPathMultipart)
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Close lock held on bucket/object/uploadid/fs.json,
|
||||
@ -767,16 +767,21 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
multipartObjectDir := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object)
|
||||
multipartUploadIDDir := pathJoin(multipartObjectDir, uploadID)
|
||||
if err = fsRemoveUploadIDPath(multipartObjectDir, multipartUploadIDDir); err != nil {
|
||||
return "", toObjectErr(traceError(err), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
|
||||
// Remove entry from `uploads.json`.
|
||||
if err = fs.removeUploadID(bucket, object, uploadID, rwlk); err != nil {
|
||||
return "", toObjectErr(err, minioMetaMultipartBucket, pathutil.Join(bucket, object))
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaMultipartBucket, pathutil.Join(bucket, object))
|
||||
}
|
||||
|
||||
// Return md5sum.
|
||||
return s3MD5, nil
|
||||
fi, err := fsStatFile(fsNSObjPath)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
|
||||
// Return object info.
|
||||
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||
}
|
||||
|
||||
// AbortMultipartUpload - aborts an ongoing multipart operation
|
||||
|
@ -44,7 +44,7 @@ type ObjectLayer interface {
|
||||
PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (md5 string, err error)
|
||||
ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error)
|
||||
AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (md5 string, err error)
|
||||
CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (objInfo ObjectInfo, err error)
|
||||
|
||||
// Healing operations.
|
||||
HealBucket(bucket string) error
|
||||
|
@ -1930,7 +1930,7 @@ func testObjectCompleteMultipartUpload(obj ObjectLayer, instanceType string, t T
|
||||
if actualErr == nil && testCase.shouldPass {
|
||||
|
||||
// Asserting IsTruncated.
|
||||
if actualResult != testCase.expectedS3MD5 {
|
||||
if actualResult.MD5Sum != testCase.expectedS3MD5 {
|
||||
t.Errorf("Test %d: %s: Expected the result to be \"%v\", but found it to \"%v\"", i+1, instanceType, testCase.expectedS3MD5, actualResult)
|
||||
}
|
||||
}
|
||||
|
@ -733,7 +733,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
// Get upload id.
|
||||
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
||||
|
||||
var md5Sum string
|
||||
completeMultipartBytes, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to complete multipart upload.")
|
||||
@ -768,7 +767,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
destLock.Lock()
|
||||
defer destLock.Unlock()
|
||||
|
||||
md5Sum, err = objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||
objInfo, err := objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to complete multipart upload.")
|
||||
err = errorCause(err)
|
||||
@ -786,7 +785,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
// Get object location.
|
||||
location := getLocation(r)
|
||||
// Generate complete multipart response.
|
||||
response := generateCompleteMultpartUploadResponse(bucket, object, location, md5Sum)
|
||||
response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.MD5Sum)
|
||||
encodedSuccessResponse := encodeResponse(response)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to parse CompleteMultipartUpload response")
|
||||
@ -795,18 +794,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
}
|
||||
|
||||
// Set etag.
|
||||
w.Header().Set("ETag", "\""+md5Sum+"\"")
|
||||
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
|
||||
|
||||
// Write success response.
|
||||
writeSuccessResponseXML(w, encodedSuccessResponse)
|
||||
|
||||
// Fetch object info for notifications.
|
||||
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
|
||||
return
|
||||
}
|
||||
|
||||
// Notify object created event.
|
||||
eventNotify(eventData{
|
||||
Type: ObjectCreatedCompleteMultipartUpload,
|
||||
|
@ -116,11 +116,11 @@ func testMultipartObjectCreation(obj ObjectLayer, instanceType string, c TestErr
|
||||
}
|
||||
completedParts.Parts = append(completedParts.Parts, completePart{PartNumber: i, ETag: calculatedMD5sum})
|
||||
}
|
||||
md5Sum, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
||||
objInfo, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
||||
if err != nil {
|
||||
c.Fatalf("%s: <ERROR> %s", instanceType, err)
|
||||
}
|
||||
if md5Sum != "7d364cb728ce42a74a96d22949beefb2-10" {
|
||||
if objInfo.MD5Sum != "7d364cb728ce42a74a96d22949beefb2-10" {
|
||||
c.Errorf("Md5 mismtch")
|
||||
}
|
||||
}
|
||||
|
@ -575,9 +575,9 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
|
||||
// md5sums of all the parts.
|
||||
//
|
||||
// Implements S3 compatible Complete multipart API.
|
||||
func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
|
||||
func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, error) {
|
||||
if err := checkCompleteMultipartArgs(bucket, object, xl); err != nil {
|
||||
return "", err
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
// Hold lock so that
|
||||
@ -592,12 +592,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
defer uploadIDLock.Unlock()
|
||||
|
||||
if !xl.isUploadIDExists(bucket, object, uploadID) {
|
||||
return "", traceError(InvalidUploadID{UploadID: uploadID})
|
||||
return ObjectInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
// Calculate s3 compatible md5sum for complete multipart.
|
||||
s3MD5, err := getCompleteMultipartMD5(parts)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
uploadIDPath := pathJoin(bucket, object, uploadID)
|
||||
@ -606,7 +606,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath)
|
||||
// Do we have writeQuorum?.
|
||||
if !isDiskQuorum(errs, xl.writeQuorum) {
|
||||
return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object)
|
||||
}
|
||||
|
||||
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
|
||||
@ -617,7 +617,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// Pick one from the first valid metadata.
|
||||
xlMeta, err := pickValidXLMeta(partsMetadata, modTime)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
// Order online disks in accordance with distribution order.
|
||||
@ -637,17 +637,17 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
partIdx := objectPartIndex(currentXLMeta.Parts, part.PartNumber)
|
||||
// All parts should have same part number.
|
||||
if partIdx == -1 {
|
||||
return "", traceError(InvalidPart{})
|
||||
return ObjectInfo{}, traceError(InvalidPart{})
|
||||
}
|
||||
|
||||
// All parts should have same ETag as previously generated.
|
||||
if currentXLMeta.Parts[partIdx].ETag != part.ETag {
|
||||
return "", traceError(BadDigest{})
|
||||
return ObjectInfo{}, traceError(BadDigest{})
|
||||
}
|
||||
|
||||
// All parts except the last part has to be atleast 5MB.
|
||||
if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].Size) {
|
||||
return "", traceError(PartTooSmall{
|
||||
return ObjectInfo{}, traceError(PartTooSmall{
|
||||
PartNumber: part.PartNumber,
|
||||
PartSize: currentXLMeta.Parts[partIdx].Size,
|
||||
PartETag: part.ETag,
|
||||
@ -675,7 +675,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
|
||||
// Check if an object is present as one of the parent dir.
|
||||
if xl.parentDirIsObject(bucket, path.Dir(object)) {
|
||||
return "", toObjectErr(traceError(errFileAccessDenied), bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
|
||||
}
|
||||
|
||||
// Save the final object size and modtime.
|
||||
@ -697,11 +697,11 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
|
||||
// Write unique `xl.json` for each disk.
|
||||
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil {
|
||||
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
|
||||
}
|
||||
rErr := commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
|
||||
if rErr != nil {
|
||||
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
|
||||
return ObjectInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
@ -725,7 +725,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// regardless of `xl.json` status and rolled back in case of errors.
|
||||
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, uniqueID, xl.writeQuorum)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
@ -744,7 +744,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
|
||||
// Rename the multipart object to final location.
|
||||
if err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Delete the previously successfully renamed object.
|
||||
@ -760,11 +760,23 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
|
||||
// remove entry from uploads.json with quorum
|
||||
if err = xl.removeUploadID(bucket, object, uploadID); err != nil {
|
||||
return "", toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
|
||||
}
|
||||
|
||||
// Return md5sum.
|
||||
return s3MD5, nil
|
||||
objInfo := ObjectInfo{
|
||||
IsDir: false,
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
Size: xlMeta.Stat.Size,
|
||||
ModTime: xlMeta.Stat.ModTime,
|
||||
MD5Sum: xlMeta.Meta["md5Sum"],
|
||||
ContentType: xlMeta.Meta["content-type"],
|
||||
ContentEncoding: xlMeta.Meta["content-encoding"],
|
||||
UserDefined: xlMeta.Meta,
|
||||
}
|
||||
|
||||
// Success, return object info.
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
// abortMultipartUpload - wrapper for purging an ongoing multipart
|
||||
|
@ -638,6 +638,8 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
ContentEncoding: xlMeta.Meta["content-encoding"],
|
||||
UserDefined: xlMeta.Meta,
|
||||
}
|
||||
|
||||
// Success, return object info.
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user