multipart: remove proper MD5, rather create MD5 based on parts to be s3 compatible.

This increases the performance phenominally.
This commit is contained in:
Harshavardhana
2016-03-01 20:01:40 -08:00
parent 5f2cfdfbe2
commit f111997184
6 changed files with 60 additions and 26 deletions

View File

@@ -66,7 +66,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
completedParts := CompleteMultipartUpload{}
completedParts.Part = make([]CompletePart, 0)
finalHasher := md5.New()
for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
@@ -75,7 +74,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
}
hasher := md5.New()
finalHasher.Write([]byte(randomString))
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
@@ -87,12 +85,11 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum})
}
finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil))
completedPartsBytes, e := xml.Marshal(completedParts)
c.Assert(e, check.IsNil)
objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex)
c.Assert(objectMetadata.MD5, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
}
func testMultipartObjectAbort(c *check.C, create func() Filesystem) {

View File

@@ -65,7 +65,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
completedParts := CompleteMultipartUpload{}
completedParts.Part = make([]CompletePart, 0)
finalHasher := md5.New()
for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10)
randomString := ""
@@ -74,7 +73,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
}
hasher := md5.New()
finalHasher.Write([]byte(randomString))
hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
@@ -86,12 +84,11 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum})
}
finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil))
completedPartsBytes, e := xml.Marshal(completedParts)
c.Assert(e, check.IsNil)
objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil)
c.Assert(err, check.IsNil)
c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex)
c.Assert(objectMetadata.MD5, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
}
func testMultipartObjectAbort(c *check.C, create func() Filesystem) {

View File

@@ -141,6 +141,22 @@ func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
return true
}
// Create an s3 compatible MD5sum for complete multipart transaction.
func makeS3MD5(md5Strs ...string) (string, *probe.Error) {
var finalMD5Bytes []byte
for _, md5Str := range md5Strs {
md5Bytes, e := hex.DecodeString(md5Str)
if e != nil {
return "", probe.NewError(e)
}
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
}
md5Hasher := md5.New()
md5Hasher.Write(finalMD5Bytes)
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs))
return s3MD5, nil
}
type multiCloser struct {
Closers []io.Closer
}
@@ -177,7 +193,9 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe
var partReaders []io.Reader
var partClosers []io.Closer
for _, part := range parts {
// Trim prefix
md5Sum := strings.TrimPrefix(part.ETag, "\"")
// Trim suffix
md5Sum = strings.TrimSuffix(md5Sum, "\"")
partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
if e != nil {
@@ -199,7 +217,8 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe
defer closer.Close()
reader := io.MultiReader(partReaders...)
readBuffer := make([]byte, 4*1024*1024)
readBufferSize := 8 * 1024 * 1024 // 8MiB
readBuffer := make([]byte, readBufferSize) // Allocate 8MiB buffer.
if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil {
return probe.NewError(e)
}
@@ -465,16 +484,14 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
}
objectPath := filepath.Join(bucketPath, object)
file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
objectWriter, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
if e != nil {
return ObjectMetadata{}, probe.NewError(e)
}
md5Hasher := md5.New()
objectWriter := io.MultiWriter(file, md5Hasher)
partBytes, e := ioutil.ReadAll(data)
if e != nil {
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(e)
}
if signature != nil {
@@ -482,21 +499,21 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
sh.Write(partBytes)
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
if err != nil {
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace()
}
if !ok {
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(SignDoesNotMatch{})
}
}
completeMultipartUpload := &CompleteMultipartUpload{}
if e = xml.Unmarshal(partBytes, completeMultipartUpload); e != nil {
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(MalformedXML{})
}
if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) {
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
}
@@ -509,28 +526,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
fs.rwLock.RUnlock()
if !doPartsMatch(parts, savedParts) {
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(InvalidPart{})
}
// Parts successfully validated, save all the parts.
partPathPrefix := objectPath + uploadID
if err := saveParts(partPathPrefix, objectWriter, parts); err != nil {
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
// Successfully saved, remove all parts.
removeParts(partPathPrefix, savedParts)
var md5Strs []string
for _, part := range savedParts {
md5Strs = append(md5Strs, part.ETag)
}
// Save the s3 md5.
s3MD5, err := makeS3MD5(md5Strs...)
if err != nil {
objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace(md5Strs...)
}
// Successfully saved multipart, remove all parts in a routine.
go removeParts(partPathPrefix, savedParts)
// Critical region requiring write lock.
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, uploadID)
if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
file.CloseAndPurge()
objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
file.Close()
if e = objectWriter.Close(); e != nil {
fs.rwLock.Unlock()
return ObjectMetadata{}, probe.NewError(e)
}
fs.rwLock.Unlock()
// Send stat again to get object metadata.
@@ -538,6 +569,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
if e != nil {
return ObjectMetadata{}, probe.NewError(e)
}
contentType := "application/octet-stream"
if objectExt := filepath.Ext(objectPath); objectExt != "" {
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
@@ -551,7 +583,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
LastModified: st.ModTime(),
Size: st.Size(),
ContentType: contentType,
MD5: hex.EncodeToString(md5Hasher.Sum(nil)),
MD5: s3MD5,
}
return newObject, nil
}