mirror of https://github.com/minio/minio.git
Merge pull request #597 from harshavardhana/pr_out_an_attempt_to_implement_listmultipartuploads_
This commit is contained in:
commit
5f381a8bee
|
@ -61,6 +61,56 @@ func (server *minioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acce
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GET Bucket (List Multipart uploads)
|
||||||
|
// -------------------------
|
||||||
|
// This operation lists in-progress multipart uploads. An in-progress
|
||||||
|
// multipart upload is a multipart upload that has been initiated,
|
||||||
|
// using the Initiate Multipart Upload request, but has not yet been completed or aborted.
|
||||||
|
// This operation returns at most 1,000 multipart uploads in the response.
|
||||||
|
//
|
||||||
|
func (server *minioAPI) listMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
|
acceptsContentType := getContentType(req)
|
||||||
|
// verify if bucket allows this operation
|
||||||
|
if !server.isValidOp(w, req, acceptsContentType) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resources := getBucketMultipartResources(req.URL.Query())
|
||||||
|
if resources.MaxUploads == 0 {
|
||||||
|
resources.MaxUploads = maxObjectList
|
||||||
|
}
|
||||||
|
|
||||||
|
vars := mux.Vars(req)
|
||||||
|
bucket := vars["bucket"]
|
||||||
|
|
||||||
|
resources, err := server.driver.ListMultipartUploads(bucket, resources)
|
||||||
|
switch err := iodine.ToError(err).(type) {
|
||||||
|
case nil: // success
|
||||||
|
{
|
||||||
|
// generate response
|
||||||
|
response := generateListMultipartUploadsResult(bucket, resources)
|
||||||
|
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
|
||||||
|
// write headers
|
||||||
|
setCommonHeaders(w, getContentTypeString(acceptsContentType))
|
||||||
|
// set content-length to the size of the body
|
||||||
|
w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse)))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
// write body
|
||||||
|
w.Write(encodedSuccessResponse)
|
||||||
|
}
|
||||||
|
case drivers.ObjectNotFound:
|
||||||
|
{
|
||||||
|
writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
log.Error.Println(iodine.New(err, nil))
|
||||||
|
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// GET Bucket (List Objects)
|
// GET Bucket (List Objects)
|
||||||
// -------------------------
|
// -------------------------
|
||||||
// This implementation of the GET operation returns some or all (up to 1000)
|
// This implementation of the GET operation returns some or all (up to 1000)
|
||||||
|
@ -74,6 +124,11 @@ func (server *minioAPI) listObjectsHandler(w http.ResponseWriter, req *http.Requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if isRequestUploads(req.URL.Query()) {
|
||||||
|
server.listMultipartUploadsHandler(w, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
resources := getBucketResources(req.URL.Query())
|
resources := getBucketResources(req.URL.Query())
|
||||||
if resources.Maxkeys == 0 {
|
if resources.Maxkeys == 0 {
|
||||||
resources.Maxkeys = maxObjectList
|
resources.Maxkeys = maxObjectList
|
||||||
|
|
|
@ -16,9 +16,7 @@
|
||||||
|
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import "encoding/xml"
|
||||||
"encoding/xml"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Limit number of objects in a given response
|
// Limit number of objects in a given response
|
||||||
const (
|
const (
|
||||||
|
@ -78,6 +76,24 @@ type ListPartsResponse struct {
|
||||||
Part []*Part
|
Part []*Part
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListMultipartUploadsResponse - format for list multipart uploads response
|
||||||
|
type ListMultipartUploadsResponse struct {
|
||||||
|
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListMultipartUploadsResult" json:"-"`
|
||||||
|
|
||||||
|
Bucket string
|
||||||
|
KeyMarker string
|
||||||
|
UploadIDMarker string `xml:"UploadIdMarker"`
|
||||||
|
NextKeyMarker string
|
||||||
|
NextUploadIDMarker string `xml:"NextUploadIdMarker"`
|
||||||
|
EncodingType string
|
||||||
|
MaxUploads int
|
||||||
|
IsTruncated bool
|
||||||
|
Upload []*Upload
|
||||||
|
Prefix string
|
||||||
|
Delimiter string
|
||||||
|
CommonPrefixes []*CommonPrefix
|
||||||
|
}
|
||||||
|
|
||||||
// ListBucketsResponse - format for list buckets response
|
// ListBucketsResponse - format for list buckets response
|
||||||
type ListBucketsResponse struct {
|
type ListBucketsResponse struct {
|
||||||
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListAllMyBucketsResult" json:"-"`
|
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListAllMyBucketsResult" json:"-"`
|
||||||
|
@ -88,6 +104,16 @@ type ListBucketsResponse struct {
|
||||||
Owner Owner
|
Owner Owner
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Upload container for in progress multipart upload
|
||||||
|
type Upload struct {
|
||||||
|
Key string
|
||||||
|
UploadID string `xml:"UploadId"`
|
||||||
|
Initiator Initiator
|
||||||
|
Owner Owner
|
||||||
|
StorageClass string
|
||||||
|
Initiated string
|
||||||
|
}
|
||||||
|
|
||||||
// CommonPrefix container for prefix response in ListObjectsResponse
|
// CommonPrefix container for prefix response in ListObjectsResponse
|
||||||
type CommonPrefix struct {
|
type CommonPrefix struct {
|
||||||
Prefix string
|
Prefix string
|
||||||
|
|
|
@ -131,13 +131,6 @@ func generateCompleteMultpartUploadResult(bucket, key, location, etag string) Co
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// partNumber
|
|
||||||
type partNumber []*Part
|
|
||||||
|
|
||||||
func (b partNumber) Len() int { return len(b) }
|
|
||||||
func (b partNumber) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
|
||||||
func (b partNumber) Less(i, j int) bool { return b[i].PartNumber < b[j].PartNumber }
|
|
||||||
|
|
||||||
// generateListPartsResult
|
// generateListPartsResult
|
||||||
func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) ListPartsResponse {
|
func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) ListPartsResponse {
|
||||||
// TODO - support EncodingType in xml decoding
|
// TODO - support EncodingType in xml decoding
|
||||||
|
@ -168,6 +161,31 @@ func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) Lis
|
||||||
return listPartsResponse
|
return listPartsResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generateListMultipartUploadsResult
|
||||||
|
func generateListMultipartUploadsResult(bucket string, metadata drivers.BucketMultipartResourcesMetadata) ListMultipartUploadsResponse {
|
||||||
|
listMultipartUploadsResponse := ListMultipartUploadsResponse{}
|
||||||
|
listMultipartUploadsResponse.Bucket = bucket
|
||||||
|
listMultipartUploadsResponse.Delimiter = metadata.Delimiter
|
||||||
|
listMultipartUploadsResponse.IsTruncated = metadata.IsTruncated
|
||||||
|
listMultipartUploadsResponse.EncodingType = metadata.EncodingType
|
||||||
|
listMultipartUploadsResponse.Prefix = metadata.Prefix
|
||||||
|
listMultipartUploadsResponse.KeyMarker = metadata.KeyMarker
|
||||||
|
listMultipartUploadsResponse.NextKeyMarker = metadata.NextKeyMarker
|
||||||
|
listMultipartUploadsResponse.MaxUploads = metadata.MaxUploads
|
||||||
|
listMultipartUploadsResponse.NextUploadIDMarker = metadata.NextUploadIDMarker
|
||||||
|
listMultipartUploadsResponse.UploadIDMarker = metadata.UploadIDMarker
|
||||||
|
|
||||||
|
listMultipartUploadsResponse.Upload = make([]*Upload, len(metadata.Upload))
|
||||||
|
for _, upload := range metadata.Upload {
|
||||||
|
newUpload := &Upload{}
|
||||||
|
newUpload.UploadID = upload.UploadID
|
||||||
|
newUpload.Key = upload.Key
|
||||||
|
newUpload.Initiated = upload.Initiated.Format(iso8601Format)
|
||||||
|
listMultipartUploadsResponse.Upload = append(listMultipartUploadsResponse.Upload, newUpload)
|
||||||
|
}
|
||||||
|
return listMultipartUploadsResponse
|
||||||
|
}
|
||||||
|
|
||||||
// writeSuccessResponse write success headers
|
// writeSuccessResponse write success headers
|
||||||
func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) {
|
func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) {
|
||||||
setCommonHeaders(w, getContentTypeString(acceptsContentType))
|
setCommonHeaders(w, getContentTypeString(acceptsContentType))
|
||||||
|
|
|
@ -33,6 +33,17 @@ func getBucketResources(values url.Values) (v drivers.BucketResourcesMetadata) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// part bucket url queries for ?uploads
|
||||||
|
func getBucketMultipartResources(values url.Values) (v drivers.BucketMultipartResourcesMetadata) {
|
||||||
|
v.Prefix = values.Get("prefix")
|
||||||
|
v.KeyMarker = values.Get("key-marker")
|
||||||
|
v.MaxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
|
||||||
|
v.Delimiter = values.Get("delimiter")
|
||||||
|
v.EncodingType = values.Get("encoding-type")
|
||||||
|
v.UploadIDMarker = values.Get("upload-id-marker")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// parse object url queries
|
// parse object url queries
|
||||||
func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) {
|
func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) {
|
||||||
v.UploadID = values.Get("uploadId")
|
v.UploadID = values.Get("uploadId")
|
||||||
|
@ -42,15 +53,14 @@ func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if req query values have acl
|
// check if req quere values carry uploads resource
|
||||||
func isRequestBucketACL(values url.Values) bool {
|
func isRequestUploads(values url.Values) bool {
|
||||||
for key := range values {
|
_, ok := values["uploads"]
|
||||||
switch true {
|
return ok
|
||||||
case key == "acl":
|
}
|
||||||
return true
|
|
||||||
default:
|
// check if req query values carry acl resource
|
||||||
return false
|
func isRequestBucketACL(values url.Values) bool {
|
||||||
}
|
_, ok := values["acl"]
|
||||||
}
|
return ok
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -400,6 +400,10 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
|
||||||
return calculatedMD5Sum, nil
|
return calculatedMD5Sum, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d donutDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
|
||||||
|
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil)
|
||||||
|
}
|
||||||
|
|
||||||
func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
|
func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
|
||||||
return "", iodine.New(errors.New("Not Implemented"), nil)
|
return "", iodine.New(errors.New("Not Implemented"), nil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ type Driver interface {
|
||||||
CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error)
|
CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error)
|
||||||
|
|
||||||
// Object Multipart Operations
|
// Object Multipart Operations
|
||||||
|
ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error)
|
||||||
NewMultipartUpload(bucket, key, contentType string) (string, error)
|
NewMultipartUpload(bucket, key, contentType string) (string, error)
|
||||||
AbortMultipartUpload(bucket, key, UploadID string) error
|
AbortMultipartUpload(bucket, key, UploadID string) error
|
||||||
CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
|
CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
|
||||||
|
@ -115,18 +116,10 @@ type PartMetadata struct {
|
||||||
|
|
||||||
// ObjectResourcesMetadata - various types of object resources
|
// ObjectResourcesMetadata - various types of object resources
|
||||||
type ObjectResourcesMetadata struct {
|
type ObjectResourcesMetadata struct {
|
||||||
Bucket string
|
Bucket string
|
||||||
EncodingType string
|
EncodingType string
|
||||||
Key string
|
Key string
|
||||||
UploadID string
|
UploadID string
|
||||||
Initiator struct {
|
|
||||||
ID string
|
|
||||||
DisplayName string
|
|
||||||
}
|
|
||||||
Owner struct {
|
|
||||||
ID string
|
|
||||||
DisplayName string
|
|
||||||
}
|
|
||||||
StorageClass string
|
StorageClass string
|
||||||
PartNumberMarker int
|
PartNumberMarker int
|
||||||
NextPartNumberMarker int
|
NextPartNumberMarker int
|
||||||
|
@ -136,6 +129,29 @@ type ObjectResourcesMetadata struct {
|
||||||
Part []*PartMetadata
|
Part []*PartMetadata
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UploadMetadata container capturing metadata on in progress multipart upload in a given bucket
|
||||||
|
type UploadMetadata struct {
|
||||||
|
Key string
|
||||||
|
UploadID string
|
||||||
|
StorageClass string
|
||||||
|
Initiated time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// BucketMultipartResourcesMetadata - various types of bucket resources for inprogress multipart uploads
|
||||||
|
type BucketMultipartResourcesMetadata struct {
|
||||||
|
KeyMarker string
|
||||||
|
UploadIDMarker string
|
||||||
|
NextKeyMarker string
|
||||||
|
NextUploadIDMarker string
|
||||||
|
EncodingType string
|
||||||
|
MaxUploads int
|
||||||
|
IsTruncated bool
|
||||||
|
Upload []*UploadMetadata
|
||||||
|
Prefix string
|
||||||
|
Delimiter string
|
||||||
|
CommonPrefixes []string
|
||||||
|
}
|
||||||
|
|
||||||
// BucketResourcesMetadata - various types of bucket resources
|
// BucketResourcesMetadata - various types of bucket resources
|
||||||
type BucketResourcesMetadata struct {
|
type BucketResourcesMetadata struct {
|
||||||
Prefix string
|
Prefix string
|
||||||
|
|
|
@ -55,6 +55,7 @@ type storedBucket struct {
|
||||||
type multiPartSession struct {
|
type multiPartSession struct {
|
||||||
totalParts int
|
totalParts int
|
||||||
uploadID string
|
uploadID string
|
||||||
|
initiated time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -424,7 +425,6 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR
|
||||||
keys, resources = memory.listObjects(keys, key, resources)
|
keys, resources = memory.listObjects(keys, key, resources)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Marker logic - TODO in-efficient right now fix it
|
|
||||||
var newKeys []string
|
var newKeys []string
|
||||||
switch {
|
switch {
|
||||||
case resources.Marker != "":
|
case resources.Marker != "":
|
||||||
|
@ -543,6 +543,7 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string)
|
||||||
storedBucket.multiPartSession = make(map[string]multiPartSession)
|
storedBucket.multiPartSession = make(map[string]multiPartSession)
|
||||||
storedBucket.multiPartSession[key] = multiPartSession{
|
storedBucket.multiPartSession[key] = multiPartSession{
|
||||||
uploadID: uploadID,
|
uploadID: uploadID,
|
||||||
|
initiated: time.Now(),
|
||||||
totalParts: 0,
|
totalParts: 0,
|
||||||
}
|
}
|
||||||
memory.storedBuckets[bucket] = storedBucket
|
memory.storedBuckets[bucket] = storedBucket
|
||||||
|
@ -675,6 +676,45 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
|
||||||
return etag, nil
|
return etag, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// byKey is a sortable interface for UploadMetadata slice
|
||||||
|
type byKey []*drivers.UploadMetadata
|
||||||
|
|
||||||
|
func (a byKey) Len() int { return len(a) }
|
||||||
|
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
|
||||||
|
|
||||||
|
func (memory *memoryDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
|
||||||
|
// TODO handle delimiter, prefix, uploadIDMarker
|
||||||
|
memory.lock.RLock()
|
||||||
|
defer memory.lock.RUnlock()
|
||||||
|
if _, ok := memory.storedBuckets[bucket]; ok == false {
|
||||||
|
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
|
||||||
|
}
|
||||||
|
storedBucket := memory.storedBuckets[bucket]
|
||||||
|
var uploads []*drivers.UploadMetadata
|
||||||
|
|
||||||
|
for key, session := range storedBucket.multiPartSession {
|
||||||
|
if len(uploads) > resources.MaxUploads {
|
||||||
|
sort.Sort(byKey(uploads))
|
||||||
|
resources.Upload = uploads
|
||||||
|
resources.NextKeyMarker = key
|
||||||
|
resources.NextUploadIDMarker = session.uploadID
|
||||||
|
resources.IsTruncated = true
|
||||||
|
return resources, nil
|
||||||
|
}
|
||||||
|
if key > resources.KeyMarker {
|
||||||
|
upload := new(drivers.UploadMetadata)
|
||||||
|
upload.Key = key
|
||||||
|
upload.UploadID = session.uploadID
|
||||||
|
upload.Initiated = session.initiated
|
||||||
|
uploads = append(uploads, upload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Sort(byKey(uploads))
|
||||||
|
resources.Upload = uploads
|
||||||
|
return resources, nil
|
||||||
|
}
|
||||||
|
|
||||||
// partNumber is a sortable interface for Part slice
|
// partNumber is a sortable interface for Part slice
|
||||||
type partNumber []*drivers.PartMetadata
|
type partNumber []*drivers.PartMetadata
|
||||||
|
|
||||||
|
@ -690,6 +730,9 @@ func (memory *memoryDriver) ListObjectParts(bucket, key string, resources driver
|
||||||
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
|
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
|
||||||
}
|
}
|
||||||
storedBucket := memory.storedBuckets[bucket]
|
storedBucket := memory.storedBuckets[bucket]
|
||||||
|
if _, ok := storedBucket.multiPartSession[key]; ok == false {
|
||||||
|
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil)
|
||||||
|
}
|
||||||
if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
|
if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
|
||||||
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil)
|
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,6 +167,16 @@ func (m *Driver) ListObjectParts(bucket, key string, resources drivers.ObjectRes
|
||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListMultipartUploads is a mock
|
||||||
|
func (m *Driver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
|
||||||
|
ret := m.Called(bucket, resources)
|
||||||
|
|
||||||
|
r0 := ret.Get(0).(drivers.BucketMultipartResourcesMetadata)
|
||||||
|
r1 := ret.Error(1)
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
// AbortMultipartUpload is a mock
|
// AbortMultipartUpload is a mock
|
||||||
func (m *Driver) AbortMultipartUpload(bucket, key, uploadID string) error {
|
func (m *Driver) AbortMultipartUpload(bucket, key, uploadID string) error {
|
||||||
ret := m.Called(bucket, key, uploadID)
|
ret := m.Called(bucket, key, uploadID)
|
||||||
|
|
Loading…
Reference in New Issue