Add listparts support

This commit is contained in:
Harshavardhana
2015-05-09 11:41:26 -07:00
parent ef793dcf03
commit 18c7f620cc
9 changed files with 274 additions and 44 deletions

View File

@@ -47,8 +47,14 @@ type memoryDriver struct {
}
type storedBucket struct {
bucketMetadata drivers.BucketMetadata
objectMetadata map[string]drivers.ObjectMetadata
bucketMetadata drivers.BucketMetadata
objectMetadata map[string]drivers.ObjectMetadata
multiPartSession map[string]multiPartSession
}
type multiPartSession struct {
totalParts int
uploadID string
}
const (
@@ -533,19 +539,20 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string)
}
memory.lock.RUnlock()
memory.lock.Lock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])
md5sumBytes := md5.Sum([]byte(uploadID))
// CreateObject expects in base64 which is coming over http request header
// while all response headers with ETag are hex encoding
md5sum := base64.StdEncoding.EncodeToString(md5sumBytes[:])
// Create UploadID session, this is a temporary work around to instantiate a session.
// It would not be valid in future, since we need to work out proper sessions so that
// we can cleanly abort session and propagate failures.
_, err := memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
return uploadID, iodine.New(err, nil)
storedBucket.multiPartSession = make(map[string]multiPartSession)
storedBucket.multiPartSession[key] = multiPartSession{
uploadID: uploadID,
totalParts: 0,
}
memory.storedBuckets[bucket] = storedBucket
memory.lock.Unlock()
return uploadID, nil
}
func getMultipartKey(key string, uploadID string, partNumber int) string {
@@ -554,17 +561,29 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
if !ok {
memory.lock.RLock()
storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
memory.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
memory.lock.RUnlock()
etag, err := memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
// once successful, update totalParts
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
return etag, nil
}
func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string) {
memory.lock.Lock()
defer memory.lock.Unlock()
memory.objects.Delete(bucket + "/" + key + "?uploadId=" + uploadID)
delete(memory.storedBuckets[bucket].multiPartSession, key)
}
func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, parts map[int]string) {
@@ -577,28 +596,29 @@ func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, part
}
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
// Verify upload id
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
if !ok {
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
if !drivers.IsValidBucket(bucket) {
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
memory.lock.RLock()
if _, ok := memory.storedBuckets[bucket]; ok == false {
memory.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
memory.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
memory.lock.RUnlock()
memory.lock.Lock()
var size int64
for i := range parts {
object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
if !ok {
memory.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
@@ -643,3 +663,51 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
memory.cleanupMultipartSession(bucket, key, uploadID)
return etag, nil
}
// partNumber is a sortable interface for Part slice
type partNumber []*drivers.PartMetadata
func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
func (memory *memoryDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) {
// Verify upload id
memory.lock.RLock()
defer memory.lock.RUnlock()
if _, ok := memory.storedBuckets[bucket]; ok == false {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
// TODO support PartNumberMarker and NextPartNumberMarker
objectResourcesMetadata := drivers.ObjectResourcesMetadata{}
objectResourcesMetadata.UploadID = uploadID
objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Key = key
objectResourcesMetadata.MaxParts = 1000
var parts []*drivers.PartMetadata
for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}
object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
if !ok {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
partMetadata := &drivers.PartMetadata{}
partMetadata.Size = object.Size
partMetadata.LastModified = object.Created
partMetadata.ETag = object.Md5
partMetadata.PartNumber = i
parts = append(parts, partMetadata)
}
sort.Sort(partNumber(parts))
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}