diff --git a/pkg/api/api_definitions.go b/pkg/api/api_definitions.go index 8398eb1e6..e6ed5e4c0 100644 --- a/pkg/api/api_definitions.go +++ b/pkg/api/api_definitions.go @@ -55,6 +55,29 @@ type ListObjectsResponse struct { Prefix string } +// ListPartsResponse - format for list parts response +type ListPartsResponse struct { + XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListPartsResult" json:"-"` + + Bucket string + Key string + UploadID string `xml:"UploadId"` + + Initiator Initiator + Owner Owner + + // The class of storage used to store the object. + StorageClass string + + PartNumberMarker int + NextPartNumberMarker int + MaxParts int + IsTruncated bool + + // List of parts + Part []*Part +} + // ListBucketsResponse - format for list buckets response type ListBucketsResponse struct { XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListAllMyBucketsResult" json:"-"` @@ -76,6 +99,14 @@ type Bucket struct { CreationDate string } +// Part container for part metadata +type Part struct { + PartNumber int + ETag string + LastModified string + Size int64 +} + // Object container for object metadata type Object struct { ETag string @@ -89,6 +120,9 @@ type Object struct { StorageClass string } +// Initiator inherit from Owner struct, fields are same +type Initiator Owner + // Owner - bucket owner/principal type Owner struct { ID string @@ -126,12 +160,6 @@ type CompleteMultipartUploadResult struct { ETag string } -// Part description of a multipart part -type Part struct { - PartNumber int - ETag string -} - // List of not implemented bucket queries var notimplementedBucketResourceNames = map[string]bool{ "policy": true, diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index 2f2d240de..621caf8c8 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -345,6 +345,37 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re } } +func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http.Request) { + acceptsContentType := getContentType(req) + if acceptsContentType == unknownContentType { + writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) + return + } + vars := mux.Vars(req) + bucket := vars["bucket"] + object := vars["object"] + uploadID := vars["uploadId"] + + objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, uploadID) + switch err := iodine.ToError(err).(type) { + case nil: + response := generateListPartsResult(objectResourcesMetadata) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType)) + // set content-length to the size of the body + w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse))) + w.WriteHeader(http.StatusOK) + // write body + w.Write(encodedSuccessResponse) + case drivers.InvalidUploadID: + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) + default: + log.Println(err) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } +} + func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) if acceptsContentType == unknownContentType { diff --git a/pkg/api/api_response.go b/pkg/api/api_response.go index 245c29537..458962b32 100644 --- a/pkg/api/api_response.go +++ b/pkg/api/api_response.go @@ -112,6 +112,7 @@ func generateListObjectsResponse(bucket string, objects []drivers.ObjectMetadata return data } +// generateInitiateMultipartUploadResult func generateInitiateMultipartUploadResult(bucket, key, uploadID string) InitiateMultipartUploadResult { return InitiateMultipartUploadResult{ Bucket: bucket, @@ -120,6 +121,7 @@ func generateInitiateMultipartUploadResult(bucket, key, uploadID string) Initiat } } +// generateCompleteMultipartUploadResult func generateCompleteMultpartUploadResult(bucket, key, location, etag string) CompleteMultipartUploadResult { return CompleteMultipartUploadResult{ Location: location, @@ -129,13 +131,50 @@ func generateCompleteMultpartUploadResult(bucket, key, location, etag string) Co } } -// writeSuccessResponse - write success headers +// partNumber +type partNumber []*Part + +func (b partNumber) Len() int { return len(b) } +func (b partNumber) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b partNumber) Less(i, j int) bool { return b[i].PartNumber < b[j].PartNumber } + +// generateListPartsResult +func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) ListPartsResponse { + // TODO - support EncodingType in xml decoding + listPartsResponse := ListPartsResponse{} + listPartsResponse.Bucket = objectMetadata.Bucket + listPartsResponse.Key = objectMetadata.Key + listPartsResponse.UploadID = objectMetadata.UploadID + listPartsResponse.StorageClass = "STANDARD" + listPartsResponse.Initiator.ID = "minio" + listPartsResponse.Initiator.DisplayName = "minio" + listPartsResponse.Owner.ID = "minio" + listPartsResponse.Owner.DisplayName = "minio" + + listPartsResponse.MaxParts = objectMetadata.MaxParts + listPartsResponse.PartNumberMarker = objectMetadata.PartNumberMarker + listPartsResponse.IsTruncated = objectMetadata.IsTruncated + listPartsResponse.NextPartNumberMarker = objectMetadata.NextPartNumberMarker + + listPartsResponse.Part = make([]*Part, len(objectMetadata.Part)) + for _, part := range objectMetadata.Part { + newPart := &Part{} + newPart.PartNumber = part.PartNumber + newPart.ETag = part.ETag + newPart.Size = part.Size + newPart.LastModified = part.LastModified.Format(iso8601Format) + listPartsResponse.Part = append(listPartsResponse.Part, newPart) + } + return listPartsResponse +} + +// writeSuccessResponse write success headers func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) { setCommonHeaders(w, getContentTypeString(acceptsContentType)) w.WriteHeader(http.StatusOK) } -// writeErrorRespone - write error headers +// writeErrorRespone write error headers func writeErrorResponse(w http.ResponseWriter, req *http.Request, errorType int, acceptsContentType contentType, resource string) { error := getErrorCode(errorType) // generate error response diff --git a/pkg/api/api_router.go b/pkg/api/api_router.go index 517d64961..e7f6a68ca 100644 --- a/pkg/api/api_router.go +++ b/pkg/api/api_router.go @@ -45,14 +45,16 @@ func HTTPHandler(driver drivers.Driver) http.Handler { mux.HandleFunc("/{bucket}", api.listObjectsHandler).Methods("GET") mux.HandleFunc("/{bucket}", api.putBucketHandler).Methods("PUT") mux.HandleFunc("/{bucket}", api.headBucketHandler).Methods("HEAD") - mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET") mux.HandleFunc("/{bucket}/{object:.*}", api.headObjectHandler).Methods("HEAD") if featureflags.Get(featureflags.MultipartPutObject) { log.Println("Enabling feature", featureflags.MultipartPutObject) - mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT") + mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectPartHandler).Queries("partNumber", + "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT") + mux.HandleFunc("/{bucket}/{object:.*}", api.listObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET") mux.HandleFunc("/{bucket}/{object:.*}", api.completeMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST") mux.HandleFunc("/{bucket}/{object:.*}", api.newMultipartUploadHandler).Methods("POST") } + mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET") mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectHandler).Methods("PUT") var conf = config.Config{} diff --git a/pkg/api/resources.go b/pkg/api/resources.go index 626408682..74986050f 100644 --- a/pkg/api/resources.go +++ b/pkg/api/resources.go @@ -42,6 +42,23 @@ func getBucketResources(values url.Values) (v drivers.BucketResourcesMetadata) { return } +// parse object url queries +func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) { + for key, value := range values { + switch true { + case key == "uploadId": + v.UploadID = value[0] + case key == "part-number-marker": + v.PartNumberMarker, _ = strconv.Atoi(value[0]) + case key == "max-parts": + v.MaxParts, _ = strconv.Atoi(value[0]) + case key == "encoding-type": + v.EncodingType = value[0] + } + } + return +} + // check if req query values have acl func isRequestBucketACL(values url.Values) bool { for key := range values { diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 79d77121b..ffe69e690 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -411,3 +411,7 @@ func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { return "", iodine.New(errors.New("Not Implemented"), nil) } + +func (d donutDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { + return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) +} diff --git a/pkg/storage/drivers/driver.go b/pkg/storage/drivers/driver.go index 602bdb7fe..5955456ca 100644 --- a/pkg/storage/drivers/driver.go +++ b/pkg/storage/drivers/driver.go @@ -35,14 +35,15 @@ type Driver interface { // Object Operations GetObject(w io.Writer, bucket, object string) (int64, error) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) - GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error) + GetObjectMetadata(bucket, key, prefix string) (ObjectMetadata, error) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) - CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) + CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error) // Object Multipart Operations - NewMultipartUpload(bucket string, key string, contentType string) (string, error) - CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) - CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error) + NewMultipartUpload(bucket, key, contentType string) (string, error) + CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) + CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) + ListObjectParts(bucket, key, uploadID string) (ObjectResourcesMetadata, error) } // BucketACL - bucket level access control @@ -103,6 +104,37 @@ const ( DefaultMode ) +// PartMetadata - various types of individual part resources +type PartMetadata struct { + PartNumber int + LastModified time.Time + ETag string + Size int64 +} + +// ObjectResourcesMetadata - various types of object resources +type ObjectResourcesMetadata struct { + Bucket string + EncodingType string + Key string + UploadID string + Initiator struct { + ID string + DisplayName string + } + Owner struct { + ID string + DisplayName string + } + StorageClass string + PartNumberMarker int + NextPartNumberMarker int + MaxParts int + IsTruncated bool + + Part []*PartMetadata +} + // BucketResourcesMetadata - various types of bucket resources type BucketResourcesMetadata struct { Prefix string diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index e86a02673..f35972673 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -47,8 +47,14 @@ type memoryDriver struct { } type storedBucket struct { - bucketMetadata drivers.BucketMetadata - objectMetadata map[string]drivers.ObjectMetadata + bucketMetadata drivers.BucketMetadata + objectMetadata map[string]drivers.ObjectMetadata + multiPartSession map[string]multiPartSession +} + +type multiPartSession struct { + totalParts int + uploadID string } const ( @@ -533,19 +539,20 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) } memory.lock.RUnlock() + memory.lock.Lock() id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:]) - md5sumBytes := md5.Sum([]byte(uploadID)) - // CreateObject expects in base64 which is coming over http request header - // while all response headers with ETag are hex encoding - md5sum := base64.StdEncoding.EncodeToString(md5sumBytes[:]) - // Create UploadID session, this is a temporary work around to instantiate a session. - // It would not be valid in future, since we need to work out proper sessions so that - // we can cleanly abort session and propagate failures. - _, err := memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID)) - return uploadID, iodine.New(err, nil) + storedBucket.multiPartSession = make(map[string]multiPartSession) + storedBucket.multiPartSession[key] = multiPartSession{ + uploadID: uploadID, + totalParts: 0, + } + memory.storedBuckets[bucket] = storedBucket + memory.lock.Unlock() + + return uploadID, nil } func getMultipartKey(key string, uploadID string, partNumber int) string { @@ -554,17 +561,29 @@ func getMultipartKey(key string, uploadID string, partNumber int) string { func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { // Verify upload id - _, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID) - if !ok { + memory.lock.RLock() + storedBucket := memory.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + memory.lock.RUnlock() return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) } - return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) + memory.lock.RUnlock() + + etag, err := memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) + if err != nil { + return "", iodine.New(err, nil) + } + // once successful, update totalParts + multiPartSession := storedBucket.multiPartSession[key] + multiPartSession.totalParts++ + storedBucket.multiPartSession[key] = multiPartSession + return etag, nil } func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string) { memory.lock.Lock() defer memory.lock.Unlock() - memory.objects.Delete(bucket + "/" + key + "?uploadId=" + uploadID) + delete(memory.storedBuckets[bucket].multiPartSession, key) } func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, parts map[int]string) { @@ -577,28 +596,29 @@ func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, part } func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { - // Verify upload id - _, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID) - if !ok { - return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } if !drivers.IsValidBucket(bucket) { return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } if !drivers.IsValidObjectName(key) { return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) } + // Verify upload id memory.lock.RLock() if _, ok := memory.storedBuckets[bucket]; ok == false { memory.lock.RUnlock() return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } + storedBucket := memory.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + memory.lock.RUnlock() + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } memory.lock.RUnlock() memory.lock.Lock() var size int64 for i := range parts { - object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)] + object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)] if !ok { memory.lock.Unlock() return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) @@ -643,3 +663,51 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string memory.cleanupMultipartSession(bucket, key, uploadID) return etag, nil } + +// partNumber is a sortable interface for Part slice +type partNumber []*drivers.PartMetadata + +func (a partNumber) Len() int { return len(a) } +func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } + +func (memory *memoryDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { + // Verify upload id + memory.lock.RLock() + defer memory.lock.RUnlock() + if _, ok := memory.storedBuckets[bucket]; ok == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + // TODO support PartNumberMarker and NextPartNumberMarker + objectResourcesMetadata := drivers.ObjectResourcesMetadata{} + objectResourcesMetadata.UploadID = uploadID + objectResourcesMetadata.Bucket = bucket + objectResourcesMetadata.Key = key + objectResourcesMetadata.MaxParts = 1000 + var parts []*drivers.PartMetadata + for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ { + if len(parts) > objectResourcesMetadata.MaxParts { + sort.Sort(partNumber(parts)) + objectResourcesMetadata.IsTruncated = true + objectResourcesMetadata.Part = parts + return objectResourcesMetadata, nil + } + object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)] + if !ok { + return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + } + partMetadata := &drivers.PartMetadata{} + partMetadata.Size = object.Size + partMetadata.LastModified = object.Created + partMetadata.ETag = object.Md5 + partMetadata.PartNumber = i + parts = append(parts, partMetadata) + } + sort.Sort(partNumber(parts)) + objectResourcesMetadata.Part = parts + return objectResourcesMetadata, nil +} diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index b3fc46c7b..493c18e01 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -56,11 +56,10 @@ func (m *Driver) SetBucketMetadata(bucket, acl string) error { // SetGetObjectWriter is a mock func (m *Driver) SetGetObjectWriter(bucket, object string, data []byte) { m.ObjectWriterData[bucket+":"+object] = data - // println(string(m.ObjectWriterData["bucket:object"])) } // GetObject is a mock -func (m *Driver) GetObject(w io.Writer, bucket string, object string) (int64, error) { +func (m *Driver) GetObject(w io.Writer, bucket, object string) (int64, error) { ret := m.Called(w, bucket, object) r0 := ret.Get(0).(int64) r1 := ret.Error(1) @@ -74,7 +73,7 @@ func (m *Driver) GetObject(w io.Writer, bucket string, object string) (int64, er } // GetPartialObject is a mock -func (m *Driver) GetPartialObject(w io.Writer, bucket string, object string, start int64, length int64) (int64, error) { +func (m *Driver) GetPartialObject(w io.Writer, bucket, object string, start int64, length int64) (int64, error) { ret := m.Called(w, bucket, object, start, length) r0 := ret.Get(0).(int64) @@ -95,7 +94,7 @@ func (m *Driver) GetPartialObject(w io.Writer, bucket string, object string, sta } // GetObjectMetadata is a mock -func (m *Driver) GetObjectMetadata(bucket string, object string, prefix string) (drivers.ObjectMetadata, error) { +func (m *Driver) GetObjectMetadata(bucket, object, prefix string) (drivers.ObjectMetadata, error) { ret := m.Called(bucket, object, prefix) r0 := ret.Get(0).(drivers.ObjectMetadata) @@ -116,7 +115,7 @@ func (m *Driver) ListObjects(bucket string, resources drivers.BucketResourcesMet } // CreateObject is a mock -func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) { +func (m *Driver) CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error) { ret := m.Called(bucket, key, contentType, md5sum, size, data) r0 := ret.Get(0).(string) @@ -126,7 +125,7 @@ func (m *Driver) CreateObject(bucket string, key string, contentType string, md5 } // NewMultipartUpload is a mock -func (m *Driver) NewMultipartUpload(bucket string, key string, contentType string) (string, error) { +func (m *Driver) NewMultipartUpload(bucket, key, contentType string) (string, error) { ret := m.Called(bucket, key, contentType) r0 := ret.Get(0).(string) @@ -136,7 +135,7 @@ func (m *Driver) NewMultipartUpload(bucket string, key string, contentType strin } // CreateObjectPart is a mock -func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) { +func (m *Driver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) { ret := m.Called(bucket, key, uploadID, partID, contentType, md5sum, size, data) r0 := ret.Get(0).(string) @@ -146,7 +145,7 @@ func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, pa } // CompleteMultipartUpload is a mock -func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error) { +func (m *Driver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { ret := m.Called(bucket, key, uploadID, parts) r0 := ret.Get(0).(string) @@ -154,3 +153,13 @@ func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID str return r0, r1 } + +// ListObjectParts is a mock +func (m *Driver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { + ret := m.Called(bucket, key, uploadID) + + r0 := ret.Get(0).(drivers.ObjectResourcesMetadata) + r1 := ret.Error(1) + + return r0, r1 +}