Implement CopyObjectPart API (#3663)

This API is implemented to allow copying data from an
existing source object to an ongoing multipart operation

http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html

Fixes #3662
This commit is contained in:
Harshavardhana
2017-01-31 09:38:34 -08:00
committed by GitHub
parent cb48517a78
commit 77a192a7b5
14 changed files with 633 additions and 73 deletions

View File

@@ -451,17 +451,49 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo,
return fsMeta.Parts[nextPartIndex], true
}
// CopyObjectPart - similar to PutObjectPart but reads data from an existing
// object. Internally incoming data is written to '.minio.sys/tmp' location
// and safely renamed to '.minio.sys/multipart' for reach parts.
func (fs fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64) (PartInfo, error) {
if err := checkNewMultipartArgs(srcBucket, srcObject, fs); err != nil {
return PartInfo{}, err
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
startOffset := int64(0) // Read the whole file.
if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil {
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
pipeWriter.CloseWithError(gerr)
return
}
pipeWriter.Close() // Close writer explicitly signalling we wrote all data.
}()
partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, length, pipeReader, "", "")
if err != nil {
return PartInfo{}, toObjectErr(err, dstBucket, dstObject)
}
// Explicitly close the reader.
pipeReader.Close()
return partInfo, nil
}
// PutObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is
// written to '.minio.sys/tmp' location and safely renamed to
// '.minio.sys/multipart' for reach parts.
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (string, error) {
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) {
if err := checkPutObjectPartArgs(bucket, object, fs); err != nil {
return "", err
return PartInfo{}, err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return "", toObjectErr(err, bucket)
return PartInfo{}, toObjectErr(err, bucket)
}
// Hold the lock so that two parallel complete-multipart-uploads
@@ -474,9 +506,9 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
uploadsPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile)
if _, err := fs.rwPool.Open(uploadsPath); err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return "", traceError(InvalidUploadID{UploadID: uploadID})
return PartInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
}
return "", toObjectErr(traceError(err), bucket, object)
return PartInfo{}, toObjectErr(traceError(err), bucket, object)
}
defer fs.rwPool.Close(uploadsPath)
@@ -487,16 +519,16 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
rwlk, err := fs.rwPool.Write(fsMetaPath)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return "", traceError(InvalidUploadID{UploadID: uploadID})
return PartInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
}
return "", toObjectErr(traceError(err), bucket, object)
return PartInfo{}, toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
fsMeta := fsMetaV1{}
_, err = fsMeta.ReadFrom(rwlk)
if err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath)
return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, fsMetaPath)
}
partSuffix := fmt.Sprintf("object%d", partID)
@@ -534,14 +566,14 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
bytesWritten, cErr := fsCreateFile(fsPartPath, teeReader, buf, size)
if cErr != nil {
fsRemoveFile(fsPartPath)
return "", toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath)
return PartInfo{}, toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath)
}
// Should return IncompleteBody{} error when reader has fewer
// bytes than specified in request header.
if bytesWritten < size {
fsRemoveFile(fsPartPath)
return "", traceError(IncompleteBody{})
return PartInfo{}, traceError(IncompleteBody{})
}
// Delete temporary part in case of failure. If
@@ -552,14 +584,14 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
return "", traceError(BadDigest{md5Hex, newMD5Hex})
return PartInfo{}, traceError(BadDigest{md5Hex, newMD5Hex})
}
}
if sha256sum != "" {
newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil))
if newSHA256sum != sha256sum {
return "", traceError(SHA256Mismatch{})
return PartInfo{}, traceError(SHA256Mismatch{})
}
}
@@ -572,14 +604,20 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
fsNSPartPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, partPath)
if err = fsRenameFile(fsPartPath, fsNSPartPath); err != nil {
partLock.Unlock()
return "", toObjectErr(err, minioMetaMultipartBucket, partPath)
return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Save the object part info in `fs.json`.
fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
if _, err = fsMeta.WriteTo(rwlk); err != nil {
partLock.Unlock()
return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, partSuffix)
fi, err := fsStatFile(partNamePath)
if err != nil {
return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partSuffix)
}
// Append the part in background.
@@ -593,7 +631,12 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
partLock.Unlock()
}()
return newMD5Hex, nil
return PartInfo{
PartNumber: partID,
LastModified: fi.ModTime(),
ETag: newMD5Hex,
Size: fi.Size(),
}, nil
}
// listObjectParts - wrapper scanning through
@@ -635,7 +678,7 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM
if err != nil {
return ListPartsInfo{}, toObjectErr(err, minioMetaMultipartBucket, partNamePath)
}
result.Parts = append(result.Parts, partInfo{
result.Parts = append(result.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime(),