Add multipart support in SSE-C encryption (#5576)

*) Add Put/Get support of multipart in encryption
*) Add GET Range support for encryption
*) Add CopyPart encrypted support
*) Support decrypting of large single PUT object
This commit is contained in:
Anis Elleuch
2018-03-01 20:37:57 +01:00
committed by kannappanr
parent d32f90fe95
commit 120b061966
15 changed files with 594 additions and 229 deletions

View File

@@ -20,7 +20,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
pathutil "path"
@@ -232,6 +231,7 @@ func (fs *FSObjects) NewMultipartUpload(bucket, object string, meta map[string]s
if err = ioutil.WriteFile(pathJoin(uploadIDDir, fsMetaJSONFile), fsMetaBytes, 0644); err != nil {
return "", errors.Trace(err)
}
return uploadID, nil
}
@@ -246,30 +246,26 @@ func (fs *FSObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject,
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil {
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
pipeWriter.CloseWithError(gerr)
if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag); gerr != nil {
if gerr = srcInfo.Writer.Close(); gerr != nil {
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
return
}
return
}
// Close writer explicitly signalling we wrote all data.
if gerr := srcInfo.Writer.Close(); gerr != nil {
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
return
}
pipeWriter.Close() // Close writer explicitly signalling we wrote all data.
}()
hashReader, err := hash.NewReader(pipeReader, length, "", "")
partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, srcInfo.Reader)
if err != nil {
return pi, toObjectErr(err, dstBucket, dstObject)
}
partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, hashReader)
if err != nil {
return pi, toObjectErr(err, dstBucket, dstObject)
}
// Explicitly close the reader.
pipeReader.Close()
return partInfo, nil
}
@@ -392,27 +388,28 @@ func (fs *FSObjects) ListObjectParts(bucket, object, uploadID string, partNumber
if entry == fsMetaJSONFile {
continue
}
partNumber, etag1, err := fs.decodePartFile(entry)
if err != nil {
return result, toObjectErr(errors.Trace(err))
partNumber, etag1, derr := fs.decodePartFile(entry)
if derr != nil {
return result, toObjectErr(errors.Trace(derr))
}
etag2, ok := partsMap[partNumber]
if !ok {
partsMap[partNumber] = etag1
continue
}
stat1, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1)))
if err != nil {
return result, toObjectErr(errors.Trace(err))
stat1, serr := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1)))
if serr != nil {
return result, toObjectErr(errors.Trace(serr))
}
stat2, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2)))
if err != nil {
return result, toObjectErr(errors.Trace(err))
stat2, serr := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2)))
if serr != nil {
return result, toObjectErr(errors.Trace(serr))
}
if stat1.ModTime().After(stat2.ModTime()) {
partsMap[partNumber] = etag1
}
}
var parts []PartInfo
for partNumber, etag := range partsMap {
parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag})
@@ -444,13 +441,21 @@ func (fs *FSObjects) ListObjectParts(bucket, object, uploadID string, partNumber
}
}
for i, part := range result.Parts {
stat, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)))
var stat os.FileInfo
stat, err = fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)))
if err != nil {
return result, toObjectErr(errors.Trace(err))
}
result.Parts[i].LastModified = stat.ModTime()
result.Parts[i].Size = stat.Size()
}
fsMetaBytes, err := ioutil.ReadFile(pathJoin(uploadIDDir, fsMetaJSONFile))
if err != nil {
return result, errors.Trace(err)
}
result.UserDefined = parseFSMetaMap(fsMetaBytes)
return result, nil
}
@@ -492,6 +497,11 @@ func (fs *FSObjects) CompleteMultipartUpload(bucket string, object string, uploa
partSize := int64(-1) // Used later to ensure that all parts sizes are same.
fsMeta := fsMetaV1{}
// Allocate parts similar to incoming slice.
fsMeta.Parts = make([]objectPartInfo, len(parts))
// Validate all parts and then commit to disk.
for i, part := range parts {
partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))
@@ -506,6 +516,13 @@ func (fs *FSObjects) CompleteMultipartUpload(bucket string, object string, uploa
if partSize == -1 {
partSize = fi.Size()
}
fsMeta.Parts[i] = objectPartInfo{
Number: part.PartNumber,
ETag: part.ETag,
Size: fi.Size(),
}
if i == len(parts)-1 {
break
}
@@ -590,7 +607,6 @@ func (fs *FSObjects) CompleteMultipartUpload(bucket string, object string, uploa
}
defer metaFile.Close()
fsMeta := fsMetaV1{}
// Read saved fs metadata for ongoing multipart.
fsMetaBuf, err := ioutil.ReadFile(pathJoin(uploadIDDir, fsMetaJSONFile))
if err != nil {