multipart: Code cleanup

- More locking cleanup. Fix naming convention.
- Simplify concatenation and blocking calls.
This commit is contained in:
Harshavardhana
2016-02-05 02:15:48 -08:00
parent d79fcb1800
commit a4c005ce30
15 changed files with 219 additions and 208 deletions

View File

@@ -44,6 +44,8 @@ import (
// isValidUploadID - is upload id.
func (fs Filesystem) isValidUploadID(object, uploadID string) bool {
fs.rwLock.RLock()
defer fs.rwLock.RUnlock()
s, ok := fs.multiparts.ActiveSession[object]
if !ok {
return false
@@ -56,9 +58,6 @@ func (fs Filesystem) isValidUploadID(object, uploadID string) bool {
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
fs.rwLock.RLock()
defer fs.rwLock.RUnlock()
// Input validation.
if !IsValidBucketName(bucket) {
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
@@ -73,6 +72,8 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
return BucketMultipartResourcesMetadata{}, probe.NewError(e)
}
var uploads []*UploadMetadata
fs.rwLock.RLock()
defer fs.rwLock.RUnlock()
for object, session := range fs.multiparts.ActiveSession {
if strings.HasPrefix(object, resources.Prefix) {
if len(uploads) > resources.MaxUploads {
@@ -117,34 +118,81 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
return resources, nil
}
// concatenate parts.
func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error {
for _, part := range parts.Part {
partFile, e := os.OpenFile(objectPath+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
defer partFile.Close()
// verify if parts sent over the network do really match with what we
// have for the session.
func doPartsMatch(parts []CompletePart, savedParts []*PartMetadata) bool {
if parts == nil || savedParts == nil {
return false
}
// Range of incoming parts and compare them with saved parts.
for i, part := range parts {
if strings.Trim(part.ETag, "\"") != savedParts[i].ETag {
return false
}
}
return true
}
type multiCloser struct {
Closers []io.Closer
}
func (m multiCloser) Close() error {
for _, c := range m.Closers {
if e := c.Close(); e != nil {
return e
}
}
return nil
}
// MultiCloser - returns a Closer that's the logical
// concatenation of the provided input closers. They're closed
// sequentially. If any of the closers return a non-nil error, Close
// will return that error.
func MultiCloser(closers ...io.Closer) io.Closer {
return multiCloser{closers}
}
// removeParts - remove all parts.
func removeParts(partPathPrefix string, parts []*PartMetadata) *probe.Error {
for _, part := range parts {
if e := os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)); e != nil {
return probe.NewError(e)
}
}
if e := os.Remove(partPathPrefix + "$multiparts"); e != nil {
return probe.NewError(e)
}
return nil
}
// saveParts - concantenate and save all parts.
func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe.Error {
var partReaders []io.Reader
var partClosers []io.Closer
for _, part := range parts {
partFile, e := os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
if e != nil {
return probe.NewError(e)
}
partReaders = append(partReaders, partFile)
partClosers = append(partClosers, partFile)
}
// Concatenate a list of closers and close upon return.
closer := MultiCloser(partClosers...)
defer closer.Close()
recvMD5 := part.ETag
// Complete multipart request header md5sum per part is hex
// encoded trim it and decode if possible.
if _, e = hex.DecodeString(strings.Trim(recvMD5, "\"")); e != nil {
return probe.NewError(InvalidDigest{Md5: recvMD5})
}
if _, e = io.Copy(mw, partFile); e != nil {
return probe.NewError(e)
}
reader := io.MultiReader(partReaders...)
readBuffer := make([]byte, 4*1024*1024)
if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil {
return probe.NewError(e)
}
return nil
}
// NewMultipartUpload - initiate a new multipart session
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
fs.rwLock.Lock()
defer fs.rwLock.Unlock()
di, e := disk.GetInfo(fs.path)
if e != nil {
return "", probe.NewError(e)
@@ -192,12 +240,6 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
multiPartfile, e := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600)
if e != nil {
return "", probe.NewError(e)
}
defer multiPartfile.Close()
// Initialize multipart session.
mpartSession := &MultipartSession{}
mpartSession.TotalParts = 0
@@ -205,14 +247,23 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
mpartSession.Initiated = time.Now().UTC()
var parts []*PartMetadata
mpartSession.Parts = parts
fs.multiparts.ActiveSession[object] = mpartSession
encoder := json.NewEncoder(multiPartfile)
if e = encoder.Encode(mpartSession); e != nil {
fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = mpartSession
fs.rwLock.Unlock()
mpartSessionBytes, e := json.Marshal(mpartSession)
if e != nil {
return "", probe.NewError(e)
}
if err := saveMultipartsSession(fs.multiparts); err != nil {
return "", err.Trace()
partPathPrefix := objectPath + uploadID
if e = ioutil.WriteFile(partPathPrefix+"$multiparts", mpartSessionBytes, 0600); e != nil {
return "", probe.NewError(e)
}
if err := saveMultipartsSession(*fs.multiparts); err != nil {
return "", err.Trace(partPathPrefix)
}
return uploadID, nil
}
@@ -226,9 +277,6 @@ func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumb
// CreateObjectPart - create a part in a multipart session
func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
fs.rwLock.Lock()
defer fs.rwLock.Unlock()
di, err := disk.GetInfo(fs.path)
if err != nil {
return "", probe.NewError(err)
@@ -266,7 +314,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// Pro-actively close the connection
return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
return "", probe.NewError(InvalidDigest{MD5: expectedMD5Sum})
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
@@ -282,29 +330,32 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
}
objectPath := filepath.Join(bucketPath, object)
partPath := objectPath + fmt.Sprintf("$%d-$multiparts", partID)
partPathPrefix := objectPath + uploadID
partPath := partPathPrefix + fmt.Sprintf("$%d-$multiparts", partID)
partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts")
if e != nil {
return "", probe.NewError(e)
}
h := md5.New()
sh := sha256.New()
mw := io.MultiWriter(partFile, h, sh)
if _, e = io.CopyN(mw, data, size); e != nil {
md5Hasher := md5.New()
sha256Hasher := sha256.New()
partWriter := io.MultiWriter(partFile, md5Hasher, sha256Hasher)
if _, e = io.CopyN(partWriter, data, size); e != nil {
partFile.CloseAndPurge()
return "", probe.NewError(e)
}
md5sum := hex.EncodeToString(h.Sum(nil))
md5sum := hex.EncodeToString(md5Hasher.Sum(nil))
// Verify if the written object is equal to what is expected, only
// if it is requested as such.
if strings.TrimSpace(expectedMD5Sum) != "" {
if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5sum) {
partFile.CloseAndPurge()
return "", probe.NewError(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Object: object})
return "", probe.NewError(BadDigest{MD5: expectedMD5Sum, Bucket: bucket, Object: object})
}
}
if signature != nil {
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256Hasher.Sum(nil)))
if err != nil {
partFile.CloseAndPurge()
return "", err.Trace()
@@ -326,24 +377,30 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
partMetadata.Size = fi.Size()
partMetadata.LastModified = fi.ModTime()
multiPartfile, e := os.OpenFile(objectPath+"$multiparts", os.O_RDWR|os.O_APPEND, 0600)
multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts")
if e != nil {
return "", probe.NewError(e)
}
defer multiPartfile.Close()
var deserializedMultipartSession MultipartSession
decoder := json.NewDecoder(multiPartfile)
if e = decoder.Decode(&deserializedMultipartSession); e != nil {
if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil {
return "", probe.NewError(e)
}
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata)
deserializedMultipartSession.TotalParts++
fs.multiparts.ActiveSession[object] = &deserializedMultipartSession
fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = &deserializedMultipartSession
fs.rwLock.Unlock()
// Sort by part number before saving.
sort.Sort(partNumber(deserializedMultipartSession.Parts))
encoder := json.NewEncoder(multiPartfile)
if e = encoder.Encode(&deserializedMultipartSession); e != nil {
multipartSessionBytes, e = json.Marshal(deserializedMultipartSession)
if e != nil {
return "", probe.NewError(e)
}
if e = ioutil.WriteFile(partPathPrefix+"$multiparts", multipartSessionBytes, 0600); e != nil {
return "", probe.NewError(e)
}
return partMetadata.ETag, nil
@@ -351,9 +408,6 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
// CompleteMultipartUpload - complete a multipart upload and persist the data
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
fs.rwLock.Lock()
defer fs.rwLock.Unlock()
// Check bucket name is valid.
if !IsValidBucketName(bucket) {
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
@@ -384,8 +438,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
if e != nil {
return ObjectMetadata{}, probe.NewError(e)
}
h := md5.New()
mw := io.MultiWriter(file, h)
md5Hasher := md5.New()
objectWriter := io.MultiWriter(file, md5Hasher)
partBytes, e := ioutil.ReadAll(data)
if e != nil {
@@ -405,35 +459,45 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
return ObjectMetadata{}, probe.NewError(SignatureDoesNotMatch{})
}
}
parts := &CompleteMultipartUpload{}
if e := xml.Unmarshal(partBytes, parts); e != nil {
completeMultipartUpload := &CompleteMultipartUpload{}
if e := xml.Unmarshal(partBytes, completeMultipartUpload); e != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(MalformedXML{})
}
if !sort.IsSorted(completedParts(parts.Part)) {
if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
}
if err := fs.concatParts(parts, objectPath, mw); err != nil {
// Save parts for verification.
parts := completeMultipartUpload.Part
fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[object].Parts
fs.rwLock.RUnlock()
if !doPartsMatch(parts, savedParts) {
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace()
return ObjectMetadata{}, probe.NewError(InvalidPart{})
}
partPathPrefix := objectPath + uploadID
if err := saveParts(partPathPrefix, objectWriter, parts); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
if err := removeParts(partPathPrefix, savedParts); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object)
for _, part := range parts.Part {
if e = os.Remove(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber)); e != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(e)
}
}
if e := os.Remove(objectPath + "$multiparts"); e != nil {
fs.rwLock.Unlock()
if err := saveMultipartsSession(*fs.multiparts); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(e)
}
if e := saveMultipartsSession(fs.multiparts); e != nil {
file.CloseAndPurge()
return ObjectMetadata{}, e.Trace()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
file.Close()
@@ -451,16 +515,13 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
Created: st.ModTime(),
Size: st.Size(),
ContentType: contentType,
Md5: hex.EncodeToString(h.Sum(nil)),
MD5: hex.EncodeToString(md5Hasher.Sum(nil)),
}
return newObject, nil
}
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
fs.rwLock.Lock()
defer fs.rwLock.Unlock()
// Check bucket name is valid.
if !IsValidBucketName(bucket) {
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
@@ -498,17 +559,16 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
}
objectPath := filepath.Join(bucketPath, object)
multiPartfile, e := os.OpenFile(objectPath+"$multiparts", os.O_RDONLY, 0600)
partPathPrefix := objectPath + resources.UploadID
multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts")
if e != nil {
return ObjectResourcesMetadata{}, probe.NewError(e)
}
defer multiPartfile.Close()
var deserializedMultipartSession MultipartSession
decoder := json.NewDecoder(multiPartfile)
if e = decoder.Decode(&deserializedMultipartSession); e != nil {
if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil {
return ObjectResourcesMetadata{}, probe.NewError(e)
}
var parts []*PartMetadata
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
@@ -527,9 +587,6 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
// AbortMultipartUpload - abort an incomplete multipart session
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
fs.rwLock.Lock()
defer fs.rwLock.Unlock()
// Check bucket name valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
@@ -555,15 +612,20 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
}
objectPath := filepath.Join(bucketPath, object)
for _, part := range fs.multiparts.ActiveSession[object].Parts {
e := os.RemoveAll(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
if e != nil {
return probe.NewError(e)
}
partPathPrefix := objectPath + uploadID
fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[object].Parts
fs.rwLock.RUnlock()
if err := removeParts(partPathPrefix, savedParts); err != nil {
return err.Trace(partPathPrefix)
}
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object)
if e := os.RemoveAll(objectPath + "$multiparts"); e != nil {
return probe.NewError(e)
fs.rwLock.Unlock()
if err := saveMultipartsSession(*fs.multiparts); err != nil {
return err.Trace(partPathPrefix)
}
return nil
}