mirror of
https://github.com/minio/minio.git
synced 2025-11-22 02:35:30 -05:00
@@ -53,7 +53,7 @@ func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) {
|
||||
return
|
||||
}
|
||||
|
||||
// byObjectMetadataKey is a sortable interface for UploadMetadata slice
|
||||
// byObjectInfoKey is a sortable interface for UploadMetadata slice
|
||||
type byUploadMetadataKey []*UploadMetadata
|
||||
|
||||
func (b byUploadMetadataKey) Len() int { return len(b) }
|
||||
@@ -456,20 +456,20 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
|
||||
}
|
||||
|
||||
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
||||
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *signature4.Sign) (ObjectMetadata, *probe.Error) {
|
||||
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *signature4.Sign) (ObjectInfo, *probe.Error) {
|
||||
// Check bucket name is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// Verify object path is legal.
|
||||
if !IsValidObjectName(object) {
|
||||
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// Verify if valid upload for incoming object.
|
||||
if !fs.isValidUploadID(object, uploadID) {
|
||||
return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
|
||||
bucket = fs.denormalizeBucket(bucket)
|
||||
@@ -477,21 +477,21 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
if _, e := os.Stat(bucketPath); e != nil {
|
||||
// Check bucket exists.
|
||||
if os.IsNotExist(e) {
|
||||
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
return ObjectInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
return ObjectMetadata{}, probe.NewError(InternalError{})
|
||||
return ObjectInfo{}, probe.NewError(InternalError{})
|
||||
}
|
||||
|
||||
objectPath := filepath.Join(bucketPath, object)
|
||||
objectWriter, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
|
||||
if e != nil {
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
partBytes, e := ioutil.ReadAll(data)
|
||||
if e != nil {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
if signature != nil {
|
||||
sh := sha256.New()
|
||||
@@ -499,21 +499,21 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
|
||||
if err != nil {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, err.Trace()
|
||||
return ObjectInfo{}, err.Trace()
|
||||
}
|
||||
if !ok {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, probe.NewError(SignDoesNotMatch{})
|
||||
return ObjectInfo{}, probe.NewError(SignDoesNotMatch{})
|
||||
}
|
||||
}
|
||||
completeMultipartUpload := &CompleteMultipartUpload{}
|
||||
if e = xml.Unmarshal(partBytes, completeMultipartUpload); e != nil {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, probe.NewError(MalformedXML{})
|
||||
return ObjectInfo{}, probe.NewError(MalformedXML{})
|
||||
}
|
||||
if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
|
||||
return ObjectInfo{}, probe.NewError(InvalidPartOrder{})
|
||||
}
|
||||
|
||||
// Save parts for verification.
|
||||
@@ -526,14 +526,14 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
|
||||
if !doPartsMatch(parts, savedParts) {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, probe.NewError(InvalidPart{})
|
||||
return ObjectInfo{}, probe.NewError(InvalidPart{})
|
||||
}
|
||||
|
||||
// Parts successfully validated, save all the parts.
|
||||
partPathPrefix := objectPath + uploadID
|
||||
if err := saveParts(partPathPrefix, objectWriter, parts); err != nil {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, err.Trace(partPathPrefix)
|
||||
return ObjectInfo{}, err.Trace(partPathPrefix)
|
||||
}
|
||||
var md5Strs []string
|
||||
for _, part := range savedParts {
|
||||
@@ -543,7 +543,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
s3MD5, err := makeS3MD5(md5Strs...)
|
||||
if err != nil {
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, err.Trace(md5Strs...)
|
||||
return ObjectInfo{}, err.Trace(md5Strs...)
|
||||
}
|
||||
|
||||
// Successfully saved multipart, remove all parts in a routine.
|
||||
@@ -555,18 +555,18 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
||||
fs.rwLock.Unlock()
|
||||
objectWriter.CloseAndPurge()
|
||||
return ObjectMetadata{}, err.Trace(partPathPrefix)
|
||||
return ObjectInfo{}, err.Trace(partPathPrefix)
|
||||
}
|
||||
if e = objectWriter.Close(); e != nil {
|
||||
fs.rwLock.Unlock()
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
fs.rwLock.Unlock()
|
||||
|
||||
// Send stat again to get object metadata.
|
||||
st, e := os.Stat(objectPath)
|
||||
if e != nil {
|
||||
return ObjectMetadata{}, probe.NewError(e)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
contentType := "application/octet-stream"
|
||||
@@ -576,13 +576,13 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
|
||||
contentType = content.ContentType
|
||||
}
|
||||
}
|
||||
newObject := ObjectMetadata{
|
||||
newObject := ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
LastModified: st.ModTime(),
|
||||
Name: object,
|
||||
ModifiedTime: st.ModTime(),
|
||||
Size: st.Size(),
|
||||
ContentType: contentType,
|
||||
MD5: s3MD5,
|
||||
MD5Sum: s3MD5,
|
||||
}
|
||||
return newObject, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user