mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
objectAPI: Fix object API interface, remove unnecessary structs.
ObjectAPI changes. ``` ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) ```
This commit is contained in:
parent
12515eabe2
commit
0479d4976b
@ -58,20 +58,20 @@ func encodeResponse(response interface{}) []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write object header
|
// Write object header
|
||||||
func setObjectHeaders(w http.ResponseWriter, objectInfo ObjectInfo, contentRange *httpRange) {
|
func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, contentRange *httpRange) {
|
||||||
// set common headers
|
// set common headers
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
|
|
||||||
// set object-related metadata headers
|
// set object-related metadata headers
|
||||||
lastModified := objectInfo.ModifiedTime.UTC().Format(http.TimeFormat)
|
lastModified := objInfo.ModifiedTime.UTC().Format(http.TimeFormat)
|
||||||
w.Header().Set("Last-Modified", lastModified)
|
w.Header().Set("Last-Modified", lastModified)
|
||||||
|
|
||||||
w.Header().Set("Content-Type", objectInfo.ContentType)
|
w.Header().Set("Content-Type", objInfo.ContentType)
|
||||||
if objectInfo.MD5Sum != "" {
|
if objInfo.MD5Sum != "" {
|
||||||
w.Header().Set("ETag", "\""+objectInfo.MD5Sum+"\"")
|
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Length", strconv.FormatInt(objectInfo.Size, 10))
|
w.Header().Set("Content-Length", strconv.FormatInt(objInfo.Size, 10))
|
||||||
|
|
||||||
// for providing ranged content
|
// for providing ranged content
|
||||||
if contentRange != nil {
|
if contentRange != nil {
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
// parse bucket url queries
|
// Parse bucket url queries
|
||||||
func getBucketResources(values url.Values) (prefix, marker, delimiter string, maxkeys int, encodingType string) {
|
func getBucketResources(values url.Values) (prefix, marker, delimiter string, maxkeys int, encodingType string) {
|
||||||
prefix = values.Get("prefix")
|
prefix = values.Get("prefix")
|
||||||
marker = values.Get("marker")
|
marker = values.Get("marker")
|
||||||
@ -31,27 +31,29 @@ func getBucketResources(values url.Values) (prefix, marker, delimiter string, ma
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// part bucket url queries for ?uploads
|
// Parse bucket url queries for ?uploads
|
||||||
func getBucketMultipartResources(values url.Values) (v BucketMultipartResourcesMetadata) {
|
func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
|
||||||
v.Prefix = values.Get("prefix")
|
|
||||||
v.KeyMarker = values.Get("key-marker")
|
prefix = values.Get("prefix")
|
||||||
v.MaxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
|
keyMarker = values.Get("key-marker")
|
||||||
v.Delimiter = values.Get("delimiter")
|
uploadIDMarker = values.Get("upload-id-marker")
|
||||||
v.EncodingType = values.Get("encoding-type")
|
delimiter = values.Get("delimiter")
|
||||||
v.UploadIDMarker = values.Get("upload-id-marker")
|
maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
|
||||||
|
encodingType = values.Get("encoding-type")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse object url queries
|
// Parse object url queries
|
||||||
func getObjectResources(values url.Values) (v ObjectResourcesMetadata) {
|
func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
|
||||||
v.UploadID = values.Get("uploadId")
|
uploadID = values.Get("uploadId")
|
||||||
v.PartNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
|
partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
|
||||||
v.MaxParts, _ = strconv.Atoi(values.Get("max-parts"))
|
maxParts, _ = strconv.Atoi(values.Get("max-parts"))
|
||||||
v.EncodingType = values.Get("encoding-type")
|
encodingType = values.Get("encoding-type")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// get upload id.
|
// Get upload id.
|
||||||
func getUploadID(values url.Values) (uploadID string) {
|
func getUploadID(values url.Values) (uploadID string) {
|
||||||
return getObjectResources(values).UploadID
|
uploadID, _, _, _ = getObjectResources(values)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
@ -245,7 +245,7 @@ func generateListBucketsResponse(buckets []BucketInfo) ListBucketsResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// generates an ListObjects response for the said bucket with other enumerated options.
|
// generates an ListObjects response for the said bucket with other enumerated options.
|
||||||
func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKeys int, resp ListObjectsResult) ListObjectsResponse {
|
func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKeys int, resp ListObjectsInfo) ListObjectsResponse {
|
||||||
var contents []Object
|
var contents []Object
|
||||||
var prefixes []CommonPrefix
|
var prefixes []CommonPrefix
|
||||||
var owner = Owner{}
|
var owner = Owner{}
|
||||||
@ -317,25 +317,25 @@ func generateCompleteMultpartUploadResponse(bucket, key, location, etag string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// generateListPartsResult
|
// generateListPartsResult
|
||||||
func generateListPartsResponse(objectMetadata ObjectResourcesMetadata) ListPartsResponse {
|
func generateListPartsResponse(partsInfo ListPartsInfo) ListPartsResponse {
|
||||||
// TODO - support EncodingType in xml decoding
|
// TODO - support EncodingType in xml decoding
|
||||||
listPartsResponse := ListPartsResponse{}
|
listPartsResponse := ListPartsResponse{}
|
||||||
listPartsResponse.Bucket = objectMetadata.Bucket
|
listPartsResponse.Bucket = partsInfo.Bucket
|
||||||
listPartsResponse.Key = objectMetadata.Object
|
listPartsResponse.Key = partsInfo.Object
|
||||||
listPartsResponse.UploadID = objectMetadata.UploadID
|
listPartsResponse.UploadID = partsInfo.UploadID
|
||||||
listPartsResponse.StorageClass = "STANDARD"
|
listPartsResponse.StorageClass = "STANDARD"
|
||||||
listPartsResponse.Initiator.ID = "minio"
|
listPartsResponse.Initiator.ID = "minio"
|
||||||
listPartsResponse.Initiator.DisplayName = "minio"
|
listPartsResponse.Initiator.DisplayName = "minio"
|
||||||
listPartsResponse.Owner.ID = "minio"
|
listPartsResponse.Owner.ID = "minio"
|
||||||
listPartsResponse.Owner.DisplayName = "minio"
|
listPartsResponse.Owner.DisplayName = "minio"
|
||||||
|
|
||||||
listPartsResponse.MaxParts = objectMetadata.MaxParts
|
listPartsResponse.MaxParts = partsInfo.MaxParts
|
||||||
listPartsResponse.PartNumberMarker = objectMetadata.PartNumberMarker
|
listPartsResponse.PartNumberMarker = partsInfo.PartNumberMarker
|
||||||
listPartsResponse.IsTruncated = objectMetadata.IsTruncated
|
listPartsResponse.IsTruncated = partsInfo.IsTruncated
|
||||||
listPartsResponse.NextPartNumberMarker = objectMetadata.NextPartNumberMarker
|
listPartsResponse.NextPartNumberMarker = partsInfo.NextPartNumberMarker
|
||||||
|
|
||||||
listPartsResponse.Parts = make([]Part, len(objectMetadata.Part))
|
listPartsResponse.Parts = make([]Part, len(partsInfo.Parts))
|
||||||
for index, part := range objectMetadata.Part {
|
for index, part := range partsInfo.Parts {
|
||||||
newPart := Part{}
|
newPart := Part{}
|
||||||
newPart.PartNumber = part.PartNumber
|
newPart.PartNumber = part.PartNumber
|
||||||
newPart.ETag = "\"" + part.ETag + "\""
|
newPart.ETag = "\"" + part.ETag + "\""
|
||||||
@ -347,21 +347,21 @@ func generateListPartsResponse(objectMetadata ObjectResourcesMetadata) ListParts
|
|||||||
}
|
}
|
||||||
|
|
||||||
// generateListMultipartUploadsResponse
|
// generateListMultipartUploadsResponse
|
||||||
func generateListMultipartUploadsResponse(bucket string, metadata BucketMultipartResourcesMetadata) ListMultipartUploadsResponse {
|
func generateListMultipartUploadsResponse(bucket string, multipartsInfo ListMultipartsInfo) ListMultipartUploadsResponse {
|
||||||
listMultipartUploadsResponse := ListMultipartUploadsResponse{}
|
listMultipartUploadsResponse := ListMultipartUploadsResponse{}
|
||||||
listMultipartUploadsResponse.Bucket = bucket
|
listMultipartUploadsResponse.Bucket = bucket
|
||||||
listMultipartUploadsResponse.Delimiter = metadata.Delimiter
|
listMultipartUploadsResponse.Delimiter = multipartsInfo.Delimiter
|
||||||
listMultipartUploadsResponse.IsTruncated = metadata.IsTruncated
|
listMultipartUploadsResponse.IsTruncated = multipartsInfo.IsTruncated
|
||||||
listMultipartUploadsResponse.EncodingType = metadata.EncodingType
|
listMultipartUploadsResponse.EncodingType = multipartsInfo.EncodingType
|
||||||
listMultipartUploadsResponse.Prefix = metadata.Prefix
|
listMultipartUploadsResponse.Prefix = multipartsInfo.Prefix
|
||||||
listMultipartUploadsResponse.KeyMarker = metadata.KeyMarker
|
listMultipartUploadsResponse.KeyMarker = multipartsInfo.KeyMarker
|
||||||
listMultipartUploadsResponse.NextKeyMarker = metadata.NextKeyMarker
|
listMultipartUploadsResponse.NextKeyMarker = multipartsInfo.NextKeyMarker
|
||||||
listMultipartUploadsResponse.MaxUploads = metadata.MaxUploads
|
listMultipartUploadsResponse.MaxUploads = multipartsInfo.MaxUploads
|
||||||
listMultipartUploadsResponse.NextUploadIDMarker = metadata.NextUploadIDMarker
|
listMultipartUploadsResponse.NextUploadIDMarker = multipartsInfo.NextUploadIDMarker
|
||||||
listMultipartUploadsResponse.UploadIDMarker = metadata.UploadIDMarker
|
listMultipartUploadsResponse.UploadIDMarker = multipartsInfo.UploadIDMarker
|
||||||
|
|
||||||
listMultipartUploadsResponse.Uploads = make([]Upload, len(metadata.Upload))
|
listMultipartUploadsResponse.Uploads = make([]Upload, len(multipartsInfo.Uploads))
|
||||||
for index, upload := range metadata.Upload {
|
for index, upload := range multipartsInfo.Uploads {
|
||||||
newUpload := Upload{}
|
newUpload := Upload{}
|
||||||
newUpload.UploadID = upload.UploadID
|
newUpload.UploadID = upload.UploadID
|
||||||
newUpload.Key = upload.Object
|
newUpload.Key = upload.Object
|
||||||
|
@ -174,16 +174,16 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resources := getBucketMultipartResources(r.URL.Query())
|
prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, _ := getBucketMultipartResources(r.URL.Query())
|
||||||
if resources.MaxUploads < 0 {
|
if maxUploads < 0 {
|
||||||
writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if resources.MaxUploads == 0 {
|
if maxUploads == 0 {
|
||||||
resources.MaxUploads = maxObjectList
|
maxUploads = maxObjectList
|
||||||
}
|
}
|
||||||
|
|
||||||
resources, err := api.ObjectAPI.ListMultipartUploads(bucket, resources)
|
listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "ListMultipartUploads failed.", nil)
|
errorIf(err.Trace(), "ListMultipartUploads failed.", nil)
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
@ -195,7 +195,7 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// generate response
|
// generate response
|
||||||
response := generateListMultipartUploadsResponse(bucket, resources)
|
response := generateListMultipartUploadsResponse(bucket, listMultipartsInfo)
|
||||||
encodedSuccessResponse := encodeResponse(response)
|
encodedSuccessResponse := encodeResponse(response)
|
||||||
// write headers.
|
// write headers.
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
@ -241,10 +241,10 @@ func (api objectStorageAPI) ListObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||||||
maxkeys = maxObjectList
|
maxkeys = maxObjectList
|
||||||
}
|
}
|
||||||
|
|
||||||
listResp, err := api.ObjectAPI.ListObjects(bucket, prefix, marker, delimiter, maxkeys)
|
listObjectsInfo, err := api.ObjectAPI.ListObjects(bucket, prefix, marker, delimiter, maxkeys)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// generate response
|
// generate response
|
||||||
response := generateListObjectsResponse(bucket, prefix, marker, delimiter, maxkeys, listResp)
|
response := generateListObjectsResponse(bucket, prefix, marker, delimiter, maxkeys, listObjectsInfo)
|
||||||
encodedSuccessResponse := encodeResponse(response)
|
encodedSuccessResponse := encodeResponse(response)
|
||||||
// Write headers
|
// Write headers
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
@ -306,10 +306,10 @@ func (api objectStorageAPI) ListBucketsHandler(w http.ResponseWriter, r *http.Re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
buckets, err := api.ObjectAPI.ListBuckets()
|
bucketsInfo, err := api.ObjectAPI.ListBuckets()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// generate response
|
// generate response
|
||||||
response := generateListBucketsResponse(buckets)
|
response := generateListBucketsResponse(bucketsInfo)
|
||||||
encodedSuccessResponse := encodeResponse(response)
|
encodedSuccessResponse := encodeResponse(response)
|
||||||
// write headers
|
// write headers
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
@ -528,7 +528,7 @@ func (api objectStorageAPI) PostPolicyBucketHandler(w http.ResponseWriter, r *ht
|
|||||||
writeErrorResponse(w, r, apiErr, r.URL.Path)
|
writeErrorResponse(w, r, apiErr, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
objectInfo, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, nil)
|
objInfo, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "PutObject failed.", nil)
|
errorIf(err.Trace(), "PutObject failed.", nil)
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
@ -547,8 +547,8 @@ func (api objectStorageAPI) PostPolicyBucketHandler(w http.ResponseWriter, r *ht
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if objectInfo.MD5Sum != "" {
|
if objInfo.MD5Sum != "" {
|
||||||
w.Header().Set("ETag", "\""+objectInfo.MD5Sum+"\"")
|
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
|
||||||
}
|
}
|
||||||
writeSuccessResponse(w, nil)
|
writeSuccessResponse(w, nil)
|
||||||
}
|
}
|
||||||
|
@ -23,15 +23,14 @@ import (
|
|||||||
|
|
||||||
var multipartsMetadataPath string
|
var multipartsMetadataPath string
|
||||||
|
|
||||||
// SetFSMultipartsMetadataPath - set custom multiparts session
|
// SetFSMultipartsMetadataPath - set custom multiparts session metadata path.
|
||||||
// metadata path.
|
|
||||||
func setFSMultipartsMetadataPath(metadataPath string) {
|
func setFSMultipartsMetadataPath(metadataPath string) {
|
||||||
multipartsMetadataPath = metadataPath
|
multipartsMetadataPath = metadataPath
|
||||||
}
|
}
|
||||||
|
|
||||||
// saveMultipartsSession - save multiparts
|
// saveMultipartsSession - save multiparts.
|
||||||
func saveMultipartsSession(multiparts Multiparts) *probe.Error {
|
func saveMultipartsSession(mparts multiparts) *probe.Error {
|
||||||
qc, err := quick.New(multiparts)
|
qc, err := quick.New(mparts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err.Trace()
|
return err.Trace()
|
||||||
}
|
}
|
||||||
@ -41,17 +40,17 @@ func saveMultipartsSession(multiparts Multiparts) *probe.Error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadMultipartsSession load multipart session file
|
// loadMultipartsSession load multipart session file.
|
||||||
func loadMultipartsSession() (*Multiparts, *probe.Error) {
|
func loadMultipartsSession() (*multiparts, *probe.Error) {
|
||||||
multiparts := &Multiparts{}
|
mparts := &multiparts{}
|
||||||
multiparts.Version = "1"
|
mparts.Version = "1"
|
||||||
multiparts.ActiveSession = make(map[string]*MultipartSession)
|
mparts.ActiveSession = make(map[string]*multipartSession)
|
||||||
qc, err := quick.New(multiparts)
|
qc, err := quick.New(mparts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err.Trace()
|
return nil, err.Trace()
|
||||||
}
|
}
|
||||||
if err := qc.Load(multipartsMetadataPath); err != nil {
|
if err := qc.Load(multipartsMetadataPath); err != nil {
|
||||||
return nil, err.Trace()
|
return nil, err.Trace()
|
||||||
}
|
}
|
||||||
return qc.Data().(*Multiparts), nil
|
return qc.Data().(*multiparts), nil
|
||||||
}
|
}
|
||||||
|
@ -29,8 +29,8 @@ import (
|
|||||||
|
|
||||||
// ListObjects - lists all objects for a given prefix, returns up to
|
// ListObjects - lists all objects for a given prefix, returns up to
|
||||||
// maxKeys number of objects per call.
|
// maxKeys number of objects per call.
|
||||||
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) {
|
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error) {
|
||||||
result := ListObjectsResult{}
|
result := ListObjectsInfo{}
|
||||||
var queryPrefix string
|
var queryPrefix string
|
||||||
|
|
||||||
// Input validation.
|
// Input validation.
|
||||||
@ -41,15 +41,15 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||||||
bucket = getActualBucketname(fs.path, bucket) // Get the right bucket name.
|
bucket = getActualBucketname(fs.path, bucket) // Get the right bucket name.
|
||||||
bucketDir := filepath.Join(fs.path, bucket)
|
bucketDir := filepath.Join(fs.path, bucket)
|
||||||
// Verify if bucket exists.
|
// Verify if bucket exists.
|
||||||
if status, err := isDirExist(bucketDir); !status {
|
if status, e := isDirExist(bucketDir); !status {
|
||||||
if err == nil {
|
if e == nil {
|
||||||
// File exists, but its not a directory.
|
// File exists, but its not a directory.
|
||||||
return result, probe.NewError(BucketNotFound{Bucket: bucket})
|
return result, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||||
} else if os.IsNotExist(err) {
|
} else if os.IsNotExist(e) {
|
||||||
// File does not exist.
|
// File does not exist.
|
||||||
return result, probe.NewError(BucketNotFound{Bucket: bucket})
|
return result, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||||
} else {
|
} else {
|
||||||
return result, probe.NewError(err)
|
return result, probe.NewError(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !IsValidObjectPrefix(prefix) {
|
if !IsValidObjectPrefix(prefix) {
|
||||||
@ -88,15 +88,15 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||||||
// Verify if prefix exists.
|
// Verify if prefix exists.
|
||||||
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
|
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
|
||||||
rootDir := filepath.Join(bucketDir, prefixDir)
|
rootDir := filepath.Join(bucketDir, prefixDir)
|
||||||
_, err := isDirExist(rootDir)
|
_, e := isDirExist(rootDir)
|
||||||
if err != nil {
|
if e != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(e) {
|
||||||
// Prefix does not exist, not an error just respond empty
|
// Prefix does not exist, not an error just respond empty
|
||||||
// list response.
|
// list response.
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
// Rest errors should be treated as failure.
|
// Rest errors should be treated as failure.
|
||||||
return result, probe.NewError(err)
|
return result, probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
recursive := true
|
recursive := true
|
||||||
@ -111,7 +111,7 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||||||
// popListObjectCh returns nil if the call to ListObject is done for the first time.
|
// popListObjectCh returns nil if the call to ListObject is done for the first time.
|
||||||
// On further calls to ListObjects to retrive more objects within the timeout period,
|
// On further calls to ListObjects to retrive more objects within the timeout period,
|
||||||
// popListObjectCh returns the channel from which rest of the objects can be retrieved.
|
// popListObjectCh returns the channel from which rest of the objects can be retrieved.
|
||||||
objectInfoCh := fs.popListObjectCh(ListObjectParams{bucket, delimiter, marker, prefix})
|
objectInfoCh := fs.popListObjectCh(listObjectParams{bucket, delimiter, marker, prefix})
|
||||||
if objectInfoCh == nil {
|
if objectInfoCh == nil {
|
||||||
if prefix != "" {
|
if prefix != "" {
|
||||||
// queryPrefix variable is set to value of the prefix to be searched.
|
// queryPrefix variable is set to value of the prefix to be searched.
|
||||||
@ -141,7 +141,7 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||||||
}
|
}
|
||||||
|
|
||||||
if objInfo.Err != nil {
|
if objInfo.Err != nil {
|
||||||
return ListObjectsResult{}, probe.NewError(objInfo.Err)
|
return ListObjectsInfo{}, probe.NewError(objInfo.Err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") {
|
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") {
|
||||||
@ -171,7 +171,7 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||||||
if !objectInfoCh.IsClosed() {
|
if !objectInfoCh.IsClosed() {
|
||||||
result.IsTruncated = true
|
result.IsTruncated = true
|
||||||
result.NextMarker = nextMarker
|
result.NextMarker = nextMarker
|
||||||
fs.pushListObjectCh(ListObjectParams{bucket, delimiter, nextMarker, prefix}, *objectInfoCh)
|
fs.pushListObjectCh(listObjectParams{bucket, delimiter, nextMarker, prefix}, *objectInfoCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
|
@ -95,7 +95,7 @@ func TestListObjects(t *testing.T) {
|
|||||||
// Formualting the result data set to be expected from ListObjects call inside the tests,
|
// Formualting the result data set to be expected from ListObjects call inside the tests,
|
||||||
// This will be used in testCases and used for asserting the correctness of ListObjects output in the tests.
|
// This will be used in testCases and used for asserting the correctness of ListObjects output in the tests.
|
||||||
|
|
||||||
resultCases := []ListObjectsResult{
|
resultCases := []ListObjectsInfo{
|
||||||
// ListObjectsResult-0.
|
// ListObjectsResult-0.
|
||||||
// Testing for listing all objects in the bucket, (testCase 20,21,22).
|
// Testing for listing all objects in the bucket, (testCase 20,21,22).
|
||||||
{
|
{
|
||||||
@ -428,44 +428,44 @@ func TestListObjects(t *testing.T) {
|
|||||||
delimeter string
|
delimeter string
|
||||||
maxKeys int
|
maxKeys int
|
||||||
// Expected output of ListObjects.
|
// Expected output of ListObjects.
|
||||||
result ListObjectsResult
|
result ListObjectsInfo
|
||||||
err error
|
err error
|
||||||
// Flag indicating whether the test is expected to pass or not.
|
// Flag indicating whether the test is expected to pass or not.
|
||||||
shouldPass bool
|
shouldPass bool
|
||||||
}{
|
}{
|
||||||
// Test cases with invalid bucket names ( Test number 1-4 ).
|
// Test cases with invalid bucket names ( Test number 1-4 ).
|
||||||
{".test", "", "", "", 0, ListObjectsResult{}, BucketNameInvalid{Bucket: ".test"}, false},
|
{".test", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: ".test"}, false},
|
||||||
{"Test", "", "", "", 0, ListObjectsResult{}, BucketNameInvalid{Bucket: "Test"}, false},
|
{"Test", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "Test"}, false},
|
||||||
{"---", "", "", "", 0, ListObjectsResult{}, BucketNameInvalid{Bucket: "---"}, false},
|
{"---", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "---"}, false},
|
||||||
{"ad", "", "", "", 0, ListObjectsResult{}, BucketNameInvalid{Bucket: "ad"}, false},
|
{"ad", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "ad"}, false},
|
||||||
// Using an existing file for bucket name, but its not a directory (5).
|
// Using an existing file for bucket name, but its not a directory (5).
|
||||||
{"simple-file.txt", "", "", "", 0, ListObjectsResult{}, BucketNotFound{Bucket: "simple-file.txt"}, false},
|
{"simple-file.txt", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "simple-file.txt"}, false},
|
||||||
// Valid bucket names, but they donot exist (6-8).
|
// Valid bucket names, but they donot exist (6-8).
|
||||||
{"volatile-bucket-1", "", "", "", 0, ListObjectsResult{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false},
|
{"volatile-bucket-1", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false},
|
||||||
{"volatile-bucket-2", "", "", "", 0, ListObjectsResult{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false},
|
{"volatile-bucket-2", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false},
|
||||||
{"volatile-bucket-3", "", "", "", 0, ListObjectsResult{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false},
|
{"volatile-bucket-3", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false},
|
||||||
// Valid, existing bucket, but sending invalid delimeter values (9-10).
|
// Valid, existing bucket, but sending invalid delimeter values (9-10).
|
||||||
// Empty string < "" > and forward slash < / > are the ony two valid arguments for delimeter.
|
// Empty string < "" > and forward slash < / > are the ony two valid arguments for delimeter.
|
||||||
{"test-bucket-list-object", "", "", "*", 0, ListObjectsResult{}, fmt.Errorf("delimiter '%s' is not supported", "*"), false},
|
{"test-bucket-list-object", "", "", "*", 0, ListObjectsInfo{}, fmt.Errorf("delimiter '%s' is not supported", "*"), false},
|
||||||
{"test-bucket-list-object", "", "", "-", 0, ListObjectsResult{}, fmt.Errorf("delimiter '%s' is not supported", "-"), false},
|
{"test-bucket-list-object", "", "", "-", 0, ListObjectsInfo{}, fmt.Errorf("delimiter '%s' is not supported", "-"), false},
|
||||||
// Marker goes through url QueryUnescape, sending inputs for which QueryUnescape would fail (11-12).
|
// Marker goes through url QueryUnescape, sending inputs for which QueryUnescape would fail (11-12).
|
||||||
// Here is how QueryUnescape behaves https://golang.org/pkg/net/url/#QueryUnescape.
|
// Here is how QueryUnescape behaves https://golang.org/pkg/net/url/#QueryUnescape.
|
||||||
// QueryUnescape is necessasry since marker is provided as URL query parameter.
|
// QueryUnescape is necessasry since marker is provided as URL query parameter.
|
||||||
{"test-bucket-list-object", "", "test%", "", 0, ListObjectsResult{}, fmt.Errorf("invalid URL escape"), false},
|
{"test-bucket-list-object", "", "test%", "", 0, ListObjectsInfo{}, fmt.Errorf("invalid URL escape"), false},
|
||||||
{"test-bucket-list-object", "", "test%A", "", 0, ListObjectsResult{}, fmt.Errorf("invalid URL escape"), false},
|
{"test-bucket-list-object", "", "test%A", "", 0, ListObjectsInfo{}, fmt.Errorf("invalid URL escape"), false},
|
||||||
// Testing for failure cases with both perfix and marker (13).
|
// Testing for failure cases with both perfix and marker (13).
|
||||||
// The prefix and marker combination to be valid it should satisy strings.HasPrefix(marker, prefix).
|
// The prefix and marker combination to be valid it should satisy strings.HasPrefix(marker, prefix).
|
||||||
{"test-bucket-list-object", "asia", "europe-object", "", 0, ListObjectsResult{}, fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", "europe-object", "asia"), false},
|
{"test-bucket-list-object", "asia", "europe-object", "", 0, ListObjectsInfo{}, fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", "europe-object", "asia"), false},
|
||||||
// Setting a non-existing directory to be prefix (14-15).
|
// Setting a non-existing directory to be prefix (14-15).
|
||||||
{"empty-bucket", "europe/france/", "", "", 1, ListObjectsResult{}, nil, true},
|
{"empty-bucket", "europe/france/", "", "", 1, ListObjectsInfo{}, nil, true},
|
||||||
{"empty-bucket", "europe/tunisia/", "", "", 1, ListObjectsResult{}, nil, true},
|
{"empty-bucket", "europe/tunisia/", "", "", 1, ListObjectsInfo{}, nil, true},
|
||||||
// Testing on empty bucket, that is, bucket without any objects in it (16).
|
// Testing on empty bucket, that is, bucket without any objects in it (16).
|
||||||
{"empty-bucket", "", "", "", 0, ListObjectsResult{}, nil, true},
|
{"empty-bucket", "", "", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
// Setting maxKeys to negative value (17-18).
|
// Setting maxKeys to negative value (17-18).
|
||||||
{"empty-bucket", "", "", "", -1, ListObjectsResult{}, nil, true},
|
{"empty-bucket", "", "", "", -1, ListObjectsInfo{}, nil, true},
|
||||||
{"empty-bucket", "", "", "", 1, ListObjectsResult{}, nil, true},
|
{"empty-bucket", "", "", "", 1, ListObjectsInfo{}, nil, true},
|
||||||
// Setting maxKeys to a very large value (19).
|
// Setting maxKeys to a very large value (19).
|
||||||
{"empty-bucket", "", "", "", 1111000000000000, ListObjectsResult{}, nil, true},
|
{"empty-bucket", "", "", "", 1111000000000000, ListObjectsInfo{}, nil, true},
|
||||||
// Testing for all 7 objects in the bucket (20).
|
// Testing for all 7 objects in the bucket (20).
|
||||||
{"test-bucket-list-object", "", "", "", 9, resultCases[0], nil, true},
|
{"test-bucket-list-object", "", "", "", 9, resultCases[0], nil, true},
|
||||||
//Testing for negative value of maxKey, this should set maxKeys to listObjectsLimit (21).
|
//Testing for negative value of maxKey, this should set maxKeys to listObjectsLimit (21).
|
||||||
@ -493,7 +493,7 @@ func TestListObjects(t *testing.T) {
|
|||||||
{"test-bucket-list-object", "", "man", "", 10, resultCases[13], nil, true},
|
{"test-bucket-list-object", "", "man", "", 10, resultCases[13], nil, true},
|
||||||
// Marker being set to a value which is greater than and all object names when sorted (38).
|
// Marker being set to a value which is greater than and all object names when sorted (38).
|
||||||
// Expected to send an empty response in this case.
|
// Expected to send an empty response in this case.
|
||||||
{"test-bucket-list-object", "", "zen", "", 10, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "", "zen", "", 10, ListObjectsInfo{}, nil, true},
|
||||||
// Marker being set to a value which is lesser than and all object names when sorted (39).
|
// Marker being set to a value which is lesser than and all object names when sorted (39).
|
||||||
// Expected to send all the objects in the bucket in this case.
|
// Expected to send all the objects in the bucket in this case.
|
||||||
{"test-bucket-list-object", "", "Abc", "", 10, resultCases[14], nil, true},
|
{"test-bucket-list-object", "", "Abc", "", 10, resultCases[14], nil, true},
|
||||||
@ -511,13 +511,13 @@ func TestListObjects(t *testing.T) {
|
|||||||
{"test-bucket-list-object", "new", "newPrefix0", "", 2, resultCases[22], nil, true},
|
{"test-bucket-list-object", "new", "newPrefix0", "", 2, resultCases[22], nil, true},
|
||||||
// Testing with maxKeys set to 0 (48-54).
|
// Testing with maxKeys set to 0 (48-54).
|
||||||
// The parameters have to valid.
|
// The parameters have to valid.
|
||||||
{"test-bucket-list-object", "", "obj1", "", 0, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "", "obj1", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
{"test-bucket-list-object", "", "obj0", "", 0, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "", "obj0", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
{"test-bucket-list-object", "new", "", "", 0, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "new", "", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
{"test-bucket-list-object", "obj", "", "", 0, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "obj", "", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
{"test-bucket-list-object", "obj", "obj0", "", 0, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "obj", "obj0", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
{"test-bucket-list-object", "obj", "obj1", "", 0, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "obj", "obj1", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
{"test-bucket-list-object", "new", "newPrefix0", "", 0, ListObjectsResult{}, nil, true},
|
{"test-bucket-list-object", "new", "newPrefix0", "", 0, ListObjectsInfo{}, nil, true},
|
||||||
// Tests on hierarchical key names as prefix.
|
// Tests on hierarchical key names as prefix.
|
||||||
// Without delimteter the code should recurse into the prefix Dir.
|
// Without delimteter the code should recurse into the prefix Dir.
|
||||||
// Tests with prefix, but without delimiter (55-56).
|
// Tests with prefix, but without delimiter (55-56).
|
||||||
|
17
fs-bucket.go
17
fs-bucket.go
@ -21,7 +21,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/minio/minio/pkg/disk"
|
"github.com/minio/minio/pkg/disk"
|
||||||
"github.com/minio/minio/pkg/probe"
|
"github.com/minio/minio/pkg/probe"
|
||||||
@ -56,19 +55,13 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BucketInfo - name and create date
|
|
||||||
type BucketInfo struct {
|
|
||||||
Name string
|
|
||||||
Created time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListBuckets - Get service.
|
// ListBuckets - Get service.
|
||||||
func (fs Filesystem) ListBuckets() ([]BucketInfo, *probe.Error) {
|
func (fs Filesystem) ListBuckets() ([]BucketInfo, *probe.Error) {
|
||||||
files, e := ioutil.ReadDir(fs.path)
|
files, e := ioutil.ReadDir(fs.path)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return []BucketInfo{}, probe.NewError(e)
|
return []BucketInfo{}, probe.NewError(e)
|
||||||
}
|
}
|
||||||
var metadataList []BucketInfo
|
var buckets []BucketInfo
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
if !file.IsDir() {
|
if !file.IsDir() {
|
||||||
// If not directory, ignore all file types.
|
// If not directory, ignore all file types.
|
||||||
@ -79,15 +72,15 @@ func (fs Filesystem) ListBuckets() ([]BucketInfo, *probe.Error) {
|
|||||||
if !IsValidBucketName(dirName) {
|
if !IsValidBucketName(dirName) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
metadata := BucketInfo{
|
bucket := BucketInfo{
|
||||||
Name: dirName,
|
Name: dirName,
|
||||||
Created: file.ModTime(),
|
Created: file.ModTime(),
|
||||||
}
|
}
|
||||||
metadataList = append(metadataList, metadata)
|
buckets = append(buckets, bucket)
|
||||||
}
|
}
|
||||||
// Remove duplicated entries.
|
// Remove duplicated entries.
|
||||||
metadataList = removeDuplicateBuckets(metadataList)
|
buckets = removeDuplicateBuckets(buckets)
|
||||||
return metadataList, nil
|
return buckets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeDuplicateBuckets - remove duplicate buckets.
|
// removeDuplicateBuckets - remove duplicate buckets.
|
||||||
|
28
fs-dir.go
28
fs-dir.go
@ -79,18 +79,6 @@ func (f byName) Less(i, j int) bool {
|
|||||||
return n1 < n2
|
return n1 < n2
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectInfo - object info.
|
|
||||||
type ObjectInfo struct {
|
|
||||||
Bucket string
|
|
||||||
Name string
|
|
||||||
ModifiedTime time.Time
|
|
||||||
ContentType string
|
|
||||||
MD5Sum string
|
|
||||||
Size int64
|
|
||||||
IsDir bool
|
|
||||||
Err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Using sort.Search() internally to jump to the file entry containing the prefix.
|
// Using sort.Search() internally to jump to the file entry containing the prefix.
|
||||||
func searchFileInfos(fileInfos []os.FileInfo, x string) int {
|
func searchFileInfos(fileInfos []os.FileInfo, x string) int {
|
||||||
processFunc := func(i int) bool {
|
processFunc := func(i int) bool {
|
||||||
@ -140,7 +128,7 @@ func readDir(scanDir, namePrefix, queryPrefix string, isFirst bool) (objInfos []
|
|||||||
if queryPrefix != "" && isFirst {
|
if queryPrefix != "" && isFirst {
|
||||||
// If control is here then there is a queryPrefix, and there are objects which satisfies the prefix.
|
// If control is here then there is a queryPrefix, and there are objects which satisfies the prefix.
|
||||||
// Since the result is sorted, the object names which satisfies query prefix would be stored one after the other.
|
// Since the result is sorted, the object names which satisfies query prefix would be stored one after the other.
|
||||||
// Push the objectInfo only if its contains the prefix.
|
// Push the ObjectInfo only if its contains the prefix.
|
||||||
// This ensures that the channel containing object Info would only has objects with the given queryPrefix.
|
// This ensures that the channel containing object Info would only has objects with the given queryPrefix.
|
||||||
if !strings.HasPrefix(name, queryPrefix) {
|
if !strings.HasPrefix(name, queryPrefix) {
|
||||||
return
|
return
|
||||||
@ -194,8 +182,8 @@ func readDir(scanDir, namePrefix, queryPrefix string, isFirst bool) (objInfos []
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectInfoChannel - object info channel.
|
// objectInfoChannel - object info channel.
|
||||||
type ObjectInfoChannel struct {
|
type objectInfoChannel struct {
|
||||||
ch <-chan ObjectInfo
|
ch <-chan ObjectInfo
|
||||||
objInfo *ObjectInfo
|
objInfo *ObjectInfo
|
||||||
closed bool
|
closed bool
|
||||||
@ -203,7 +191,7 @@ type ObjectInfoChannel struct {
|
|||||||
timedOut bool
|
timedOut bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oic *ObjectInfoChannel) Read() (ObjectInfo, bool) {
|
func (oic *objectInfoChannel) Read() (ObjectInfo, bool) {
|
||||||
if oic.closed {
|
if oic.closed {
|
||||||
return ObjectInfo{}, false
|
return ObjectInfo{}, false
|
||||||
}
|
}
|
||||||
@ -233,7 +221,7 @@ func (oic *ObjectInfoChannel) Read() (ObjectInfo, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// IsClosed - return whether channel is closed or not.
|
// IsClosed - return whether channel is closed or not.
|
||||||
func (oic ObjectInfoChannel) IsClosed() bool {
|
func (oic objectInfoChannel) IsClosed() bool {
|
||||||
if oic.objInfo != nil {
|
if oic.objInfo != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -242,7 +230,7 @@ func (oic ObjectInfoChannel) IsClosed() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// IsTimedOut - return whether channel is closed due to timeout.
|
// IsTimedOut - return whether channel is closed due to timeout.
|
||||||
func (oic ObjectInfoChannel) IsTimedOut() bool {
|
func (oic objectInfoChannel) IsTimedOut() bool {
|
||||||
if oic.timedOut {
|
if oic.timedOut {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -261,7 +249,7 @@ func (oic ObjectInfoChannel) IsTimedOut() bool {
|
|||||||
|
|
||||||
// treeWalk - walk into 'scanDir' recursively when 'recursive' is true.
|
// treeWalk - walk into 'scanDir' recursively when 'recursive' is true.
|
||||||
// It uses 'bucketDir' to get name prefix for object name.
|
// It uses 'bucketDir' to get name prefix for object name.
|
||||||
func treeWalk(scanDir, bucketDir string, recursive bool, queryPrefix string) ObjectInfoChannel {
|
func treeWalk(scanDir, bucketDir string, recursive bool, queryPrefix string) objectInfoChannel {
|
||||||
objectInfoCh := make(chan ObjectInfo, listObjectsLimit)
|
objectInfoCh := make(chan ObjectInfo, listObjectsLimit)
|
||||||
timeoutCh := make(chan struct{}, 1)
|
timeoutCh := make(chan struct{}, 1)
|
||||||
|
|
||||||
@ -314,5 +302,5 @@ func treeWalk(scanDir, bucketDir string, recursive bool, queryPrefix string) Obj
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return ObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh}
|
return objectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh}
|
||||||
}
|
}
|
||||||
|
137
fs-multipart.go
137
fs-multipart.go
@ -50,55 +50,57 @@ func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// byObjectInfoKey is a sortable interface for UploadMetadata slice
|
// byObjectInfoKey is a sortable interface for UploadMetadata slice
|
||||||
type byUploadMetadataKey []*UploadMetadata
|
type byUploadMetadataKey []uploadMetadata
|
||||||
|
|
||||||
func (b byUploadMetadataKey) Len() int { return len(b) }
|
func (b byUploadMetadataKey) Len() int { return len(b) }
|
||||||
func (b byUploadMetadataKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
func (b byUploadMetadataKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
||||||
func (b byUploadMetadataKey) Less(i, j int) bool { return b[i].Object < b[j].Object }
|
func (b byUploadMetadataKey) Less(i, j int) bool { return b[i].Object < b[j].Object }
|
||||||
|
|
||||||
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
||||||
func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
|
func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) {
|
||||||
// Input validation.
|
// Input validation.
|
||||||
if !IsValidBucketName(bucket) {
|
if !IsValidBucketName(bucket) {
|
||||||
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
return ListMultipartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||||
}
|
}
|
||||||
bucket = getActualBucketname(fs.path, bucket)
|
bucket = getActualBucketname(fs.path, bucket)
|
||||||
bucketPath := filepath.Join(fs.path, bucket)
|
bucketPath := filepath.Join(fs.path, bucket)
|
||||||
if _, e := os.Stat(bucketPath); e != nil {
|
if _, e := os.Stat(bucketPath); e != nil {
|
||||||
// Check bucket exists.
|
// Check bucket exists.
|
||||||
if os.IsNotExist(e) {
|
if os.IsNotExist(e) {
|
||||||
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
return ListMultipartsInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||||
}
|
}
|
||||||
return BucketMultipartResourcesMetadata{}, probe.NewError(e)
|
return ListMultipartsInfo{}, probe.NewError(e)
|
||||||
}
|
}
|
||||||
var uploads []*UploadMetadata
|
var uploads []uploadMetadata
|
||||||
|
multipartsInfo := ListMultipartsInfo{}
|
||||||
|
|
||||||
fs.rwLock.RLock()
|
fs.rwLock.RLock()
|
||||||
defer fs.rwLock.RUnlock()
|
defer fs.rwLock.RUnlock()
|
||||||
for uploadID, session := range fs.multiparts.ActiveSession {
|
for uploadID, session := range fs.multiparts.ActiveSession {
|
||||||
objectName := session.ObjectName
|
objectName := session.ObjectName
|
||||||
if strings.HasPrefix(objectName, resources.Prefix) {
|
if strings.HasPrefix(objectName, objectPrefix) {
|
||||||
if len(uploads) > resources.MaxUploads {
|
if len(uploads) > maxUploads {
|
||||||
sort.Sort(byUploadMetadataKey(uploads))
|
sort.Sort(byUploadMetadataKey(uploads))
|
||||||
resources.Upload = uploads
|
multipartsInfo.Uploads = uploads
|
||||||
resources.NextKeyMarker = session.ObjectName
|
multipartsInfo.NextKeyMarker = session.ObjectName
|
||||||
resources.NextUploadIDMarker = uploadID
|
multipartsInfo.NextUploadIDMarker = uploadID
|
||||||
resources.IsTruncated = true
|
multipartsInfo.IsTruncated = true
|
||||||
return resources, nil
|
return multipartsInfo, nil
|
||||||
}
|
}
|
||||||
// UploadIDMarker is ignored if KeyMarker is empty.
|
// uploadIDMarker is ignored if KeyMarker is empty.
|
||||||
switch {
|
switch {
|
||||||
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
|
case keyMarker != "" && uploadIDMarker == "":
|
||||||
if objectName > resources.KeyMarker {
|
if objectName > keyMarker {
|
||||||
upload := new(UploadMetadata)
|
upload := uploadMetadata{}
|
||||||
upload.Object = objectName
|
upload.Object = objectName
|
||||||
upload.UploadID = uploadID
|
upload.UploadID = uploadID
|
||||||
upload.Initiated = session.Initiated
|
upload.Initiated = session.Initiated
|
||||||
uploads = append(uploads, upload)
|
uploads = append(uploads, upload)
|
||||||
}
|
}
|
||||||
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
|
case keyMarker != "" && uploadIDMarker != "":
|
||||||
if session.UploadID > resources.UploadIDMarker {
|
if session.UploadID > uploadIDMarker {
|
||||||
if objectName >= resources.KeyMarker {
|
if objectName >= keyMarker {
|
||||||
upload := new(UploadMetadata)
|
upload := uploadMetadata{}
|
||||||
upload.Object = objectName
|
upload.Object = objectName
|
||||||
upload.UploadID = uploadID
|
upload.UploadID = uploadID
|
||||||
upload.Initiated = session.Initiated
|
upload.Initiated = session.Initiated
|
||||||
@ -106,7 +108,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
upload := new(UploadMetadata)
|
upload := uploadMetadata{}
|
||||||
upload.Object = objectName
|
upload.Object = objectName
|
||||||
upload.UploadID = uploadID
|
upload.UploadID = uploadID
|
||||||
upload.Initiated = session.Initiated
|
upload.Initiated = session.Initiated
|
||||||
@ -115,13 +117,13 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Sort(byUploadMetadataKey(uploads))
|
sort.Sort(byUploadMetadataKey(uploads))
|
||||||
resources.Upload = uploads
|
multipartsInfo.Uploads = uploads
|
||||||
return resources, nil
|
return multipartsInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify if parts sent over the network do really match with what we
|
// verify if parts sent over the network do really match with what we
|
||||||
// have for the session.
|
// have for the session.
|
||||||
func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
|
func doPartsMatch(parts []completePart, savedParts []partInfo) bool {
|
||||||
if parts == nil || savedParts == nil {
|
if parts == nil || savedParts == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -175,7 +177,7 @@ func MultiCloser(closers ...io.Closer) io.Closer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// removeParts - remove all parts.
|
// removeParts - remove all parts.
|
||||||
func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
|
func removeParts(partPathPrefix string, parts []partInfo) *probe.Error {
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
// We are on purpose ignoring the return values here, since
|
// We are on purpose ignoring the return values here, since
|
||||||
// another thread would have purged these entries.
|
// another thread would have purged these entries.
|
||||||
@ -185,7 +187,7 @@ func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// saveParts - concantenate and save all parts.
|
// saveParts - concantenate and save all parts.
|
||||||
func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe.Error {
|
func saveParts(partPathPrefix string, mw io.Writer, parts []completePart) *probe.Error {
|
||||||
var partReaders []io.Reader
|
var partReaders []io.Reader
|
||||||
var partClosers []io.Closer
|
var partClosers []io.Closer
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
@ -274,13 +276,13 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
|||||||
fs.rwLock.Lock()
|
fs.rwLock.Lock()
|
||||||
defer fs.rwLock.Unlock()
|
defer fs.rwLock.Unlock()
|
||||||
// Initialize multipart session.
|
// Initialize multipart session.
|
||||||
mpartSession := &MultipartSession{}
|
mpartSession := &multipartSession{}
|
||||||
mpartSession.TotalParts = 0
|
mpartSession.TotalParts = 0
|
||||||
mpartSession.ObjectName = object
|
mpartSession.ObjectName = object
|
||||||
mpartSession.UploadID = uploadID
|
mpartSession.UploadID = uploadID
|
||||||
mpartSession.Initiated = time.Now().UTC()
|
mpartSession.Initiated = time.Now().UTC()
|
||||||
// Multipart has maximum of 10000 parts.
|
// Multipart has maximum of 10000 parts.
|
||||||
var parts []PartMetadata
|
var parts []partInfo
|
||||||
mpartSession.Parts = parts
|
mpartSession.Parts = parts
|
||||||
|
|
||||||
fs.multiparts.ActiveSession[uploadID] = mpartSession
|
fs.multiparts.ActiveSession[uploadID] = mpartSession
|
||||||
@ -291,7 +293,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove all duplicated parts based on the latest time of their upload.
|
// Remove all duplicated parts based on the latest time of their upload.
|
||||||
func removeDuplicateParts(parts []PartMetadata) []PartMetadata {
|
func removeDuplicateParts(parts []partInfo) []partInfo {
|
||||||
length := len(parts) - 1
|
length := len(parts) - 1
|
||||||
for i := 0; i < length; i++ {
|
for i := 0; i < length; i++ {
|
||||||
for j := i + 1; j <= length; j++ {
|
for j := i + 1; j <= length; j++ {
|
||||||
@ -311,7 +313,7 @@ func removeDuplicateParts(parts []PartMetadata) []PartMetadata {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// partNumber is a sortable interface for Part slice.
|
// partNumber is a sortable interface for Part slice.
|
||||||
type partNumber []PartMetadata
|
type partNumber []partInfo
|
||||||
|
|
||||||
func (a partNumber) Len() int { return len(a) }
|
func (a partNumber) Len() int { return len(a) }
|
||||||
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
@ -324,18 +326,12 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partID int,
|
|||||||
return "", probe.NewError(err)
|
return "", probe.NewError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove 5% from total space for cumulative disk space used for
|
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||||
// journalling, inodes etc.
|
|
||||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||||
return "", probe.NewError(RootPathFull{Path: fs.path})
|
return "", probe.NewError(RootPathFull{Path: fs.path})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Part id cannot be negative.
|
|
||||||
if partID <= 0 {
|
|
||||||
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check bucket name valid.
|
// Check bucket name valid.
|
||||||
if !IsValidBucketName(bucket) {
|
if !IsValidBucketName(bucket) {
|
||||||
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||||
@ -346,6 +342,11 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partID int,
|
|||||||
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Part id cannot be negative.
|
||||||
|
if partID <= 0 {
|
||||||
|
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
|
||||||
|
}
|
||||||
|
|
||||||
// Verify upload is valid for the incoming object.
|
// Verify upload is valid for the incoming object.
|
||||||
if !fs.isValidUploadID(object, uploadID) {
|
if !fs.isValidUploadID(object, uploadID) {
|
||||||
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||||
@ -394,11 +395,11 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partID int,
|
|||||||
if e != nil {
|
if e != nil {
|
||||||
return "", probe.NewError(e)
|
return "", probe.NewError(e)
|
||||||
}
|
}
|
||||||
partMetadata := PartMetadata{}
|
prtInfo := partInfo{}
|
||||||
partMetadata.PartNumber = partID
|
prtInfo.PartNumber = partID
|
||||||
partMetadata.ETag = newMD5Hex
|
prtInfo.ETag = newMD5Hex
|
||||||
partMetadata.Size = fi.Size()
|
prtInfo.Size = fi.Size()
|
||||||
partMetadata.LastModified = fi.ModTime()
|
prtInfo.LastModified = fi.ModTime()
|
||||||
|
|
||||||
// Critical region requiring read lock.
|
// Critical region requiring read lock.
|
||||||
fs.rwLock.RLock()
|
fs.rwLock.RLock()
|
||||||
@ -409,10 +410,11 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partID int,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add all incoming parts.
|
// Add all incoming parts.
|
||||||
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata)
|
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, prtInfo)
|
||||||
|
|
||||||
// Remove duplicate parts based on the most recent uploaded.
|
// Remove duplicate parts based on the most recent uploaded.
|
||||||
deserializedMultipartSession.Parts = removeDuplicateParts(deserializedMultipartSession.Parts)
|
deserializedMultipartSession.Parts = removeDuplicateParts(deserializedMultipartSession.Parts)
|
||||||
|
|
||||||
// Save total parts uploaded.
|
// Save total parts uploaded.
|
||||||
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
|
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
|
||||||
|
|
||||||
@ -431,7 +433,7 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partID int,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
||||||
func (fs Filesystem) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []CompletePart) (ObjectInfo, *probe.Error) {
|
func (fs Filesystem) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) {
|
||||||
// Check bucket name is valid.
|
// Check bucket name is valid.
|
||||||
if !IsValidBucketName(bucket) {
|
if !IsValidBucketName(bucket) {
|
||||||
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||||
@ -531,35 +533,32 @@ func (fs Filesystem) CompleteMultipartUpload(bucket string, object string, uploa
|
|||||||
return newObject, nil
|
return newObject, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
|
// ListObjectParts - list parts from incomplete multipart session.
|
||||||
func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
|
func (fs Filesystem) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) {
|
||||||
// Check bucket name is valid.
|
// Check bucket name is valid.
|
||||||
if !IsValidBucketName(bucket) {
|
if !IsValidBucketName(bucket) {
|
||||||
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
return ListPartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify object path legal.
|
// Verify object path legal.
|
||||||
if !IsValidObjectName(object) {
|
if !IsValidObjectName(object) {
|
||||||
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
return ListPartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save upload id.
|
|
||||||
uploadID := resources.UploadID
|
|
||||||
|
|
||||||
// Verify if upload id is valid for incoming object.
|
// Verify if upload id is valid for incoming object.
|
||||||
if !fs.isValidUploadID(object, uploadID) {
|
if !fs.isValidUploadID(object, uploadID) {
|
||||||
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||||
}
|
}
|
||||||
|
|
||||||
objectResourcesMetadata := resources
|
prtsInfo := ListPartsInfo{}
|
||||||
objectResourcesMetadata.Bucket = bucket
|
prtsInfo.Bucket = bucket
|
||||||
objectResourcesMetadata.Object = object
|
prtsInfo.Object = object
|
||||||
var startPartNumber int
|
var startPartNumber int
|
||||||
switch {
|
switch {
|
||||||
case objectResourcesMetadata.PartNumberMarker == 0:
|
case partNumberMarker == 0:
|
||||||
startPartNumber = 1
|
startPartNumber = 1
|
||||||
default:
|
default:
|
||||||
startPartNumber = objectResourcesMetadata.PartNumberMarker
|
startPartNumber = partNumberMarker
|
||||||
}
|
}
|
||||||
|
|
||||||
bucket = getActualBucketname(fs.path, bucket)
|
bucket = getActualBucketname(fs.path, bucket)
|
||||||
@ -567,9 +566,9 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
|
|||||||
if _, e := os.Stat(bucketPath); e != nil {
|
if _, e := os.Stat(bucketPath); e != nil {
|
||||||
// Check bucket exists.
|
// Check bucket exists.
|
||||||
if os.IsNotExist(e) {
|
if os.IsNotExist(e) {
|
||||||
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
return ListPartsInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||||
}
|
}
|
||||||
return ObjectResourcesMetadata{}, probe.NewError(e)
|
return ListPartsInfo{}, probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Critical region requiring read lock.
|
// Critical region requiring read lock.
|
||||||
@ -577,22 +576,22 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
|
|||||||
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
|
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
|
||||||
fs.rwLock.RUnlock()
|
fs.rwLock.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
|
return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||||
}
|
}
|
||||||
var parts []PartMetadata
|
var parts []partInfo
|
||||||
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
|
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
|
||||||
if len(parts) > objectResourcesMetadata.MaxParts {
|
if len(parts) > maxParts {
|
||||||
sort.Sort(partNumber(parts))
|
sort.Sort(partNumber(parts))
|
||||||
objectResourcesMetadata.IsTruncated = true
|
prtsInfo.IsTruncated = true
|
||||||
objectResourcesMetadata.Part = parts
|
prtsInfo.Parts = parts
|
||||||
objectResourcesMetadata.NextPartNumberMarker = i
|
prtsInfo.NextPartNumberMarker = i
|
||||||
return objectResourcesMetadata, nil
|
return prtsInfo, nil
|
||||||
}
|
}
|
||||||
parts = append(parts, deserializedMultipartSession.Parts[i-1])
|
parts = append(parts, deserializedMultipartSession.Parts[i-1])
|
||||||
}
|
}
|
||||||
sort.Sort(partNumber(parts))
|
sort.Sort(partNumber(parts))
|
||||||
objectResourcesMetadata.Part = parts
|
prtsInfo.Parts = parts
|
||||||
return objectResourcesMetadata, nil
|
return prtsInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AbortMultipartUpload - abort an incomplete multipart session
|
// AbortMultipartUpload - abort an incomplete multipart session
|
||||||
|
@ -58,7 +58,6 @@ func (fs Filesystem) GetObject(bucket, object string, startOffset int64) (io.Rea
|
|||||||
if os.IsNotExist(e) {
|
if os.IsNotExist(e) {
|
||||||
return nil, probe.NewError(BucketNotFound{Bucket: bucket})
|
return nil, probe.NewError(BucketNotFound{Bucket: bucket})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||||
}
|
}
|
||||||
return nil, probe.NewError(e)
|
return nil, probe.NewError(e)
|
||||||
@ -73,7 +72,7 @@ func (fs Filesystem) GetObject(bucket, object string, startOffset int64) (io.Rea
|
|||||||
return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seet to a starting offset.
|
// Seek to a starting offset.
|
||||||
_, e = file.Seek(startOffset, os.SEEK_SET)
|
_, e = file.Seek(startOffset, os.SEEK_SET)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
// When the "handle is invalid", the file might be a directory on Windows.
|
// When the "handle is invalid", the file might be a directory on Windows.
|
||||||
@ -82,8 +81,6 @@ func (fs Filesystem) GetObject(bucket, object string, startOffset int64) (io.Rea
|
|||||||
}
|
}
|
||||||
return nil, probe.NewError(e)
|
return nil, probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return successfully seeked file handler.
|
|
||||||
return file, nil
|
return file, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -238,19 +238,17 @@ func BenchmarkGetObject(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
var w bytes.Buffer
|
var buffer = new(bytes.Buffer)
|
||||||
r, err := fs.GetObject("bucket", "object"+strconv.Itoa(i%10), 0)
|
r, err := fs.GetObject("bucket", "object"+strconv.Itoa(i%10), 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Error(err)
|
b.Error(err)
|
||||||
}
|
}
|
||||||
n, e := io.Copy(&w, r)
|
if _, e := io.Copy(buffer, r); e != nil {
|
||||||
if e != nil {
|
|
||||||
b.Error(e)
|
b.Error(e)
|
||||||
}
|
}
|
||||||
if n != int64(len(text)) {
|
if buffer.Len() != len(text) {
|
||||||
b.Errorf("GetObject returned incorrect length %d (should be %d)\n", n, int64(len(text)))
|
b.Errorf("GetObject returned incorrect length %d (should be %d)\n", buffer.Len(), len(text))
|
||||||
}
|
}
|
||||||
r.Close()
|
r.Close()
|
||||||
}
|
}
|
||||||
|
62
fs.go
62
fs.go
@ -25,8 +25,8 @@ import (
|
|||||||
"github.com/minio/minio/pkg/probe"
|
"github.com/minio/minio/pkg/probe"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ListObjectParams - list object params used for list object map
|
// listObjectParams - list object params used for list object map
|
||||||
type ListObjectParams struct {
|
type listObjectParams struct {
|
||||||
bucket string
|
bucket string
|
||||||
delimiter string
|
delimiter string
|
||||||
marker string
|
marker string
|
||||||
@ -38,16 +38,31 @@ type Filesystem struct {
|
|||||||
path string
|
path string
|
||||||
minFreeDisk int64
|
minFreeDisk int64
|
||||||
rwLock *sync.RWMutex
|
rwLock *sync.RWMutex
|
||||||
multiparts *Multiparts
|
multiparts *multiparts
|
||||||
listObjectMap map[ListObjectParams][]ObjectInfoChannel
|
listObjectMap map[listObjectParams][]objectInfoChannel
|
||||||
listObjectMapMutex *sync.Mutex
|
listObjectMapMutex *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *Filesystem) pushListObjectCh(params ListObjectParams, ch ObjectInfoChannel) {
|
// MultipartSession holds active session information
|
||||||
|
type multipartSession struct {
|
||||||
|
TotalParts int
|
||||||
|
ObjectName string
|
||||||
|
UploadID string
|
||||||
|
Initiated time.Time
|
||||||
|
Parts []partInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// multiparts collection of many parts
|
||||||
|
type multiparts struct {
|
||||||
|
Version string `json:"version"`
|
||||||
|
ActiveSession map[string]*multipartSession `json:"activeSessions"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *Filesystem) pushListObjectCh(params listObjectParams, ch objectInfoChannel) {
|
||||||
fs.listObjectMapMutex.Lock()
|
fs.listObjectMapMutex.Lock()
|
||||||
defer fs.listObjectMapMutex.Unlock()
|
defer fs.listObjectMapMutex.Unlock()
|
||||||
|
|
||||||
channels := []ObjectInfoChannel{ch}
|
channels := []objectInfoChannel{ch}
|
||||||
if _, ok := fs.listObjectMap[params]; ok {
|
if _, ok := fs.listObjectMap[params]; ok {
|
||||||
channels = append(fs.listObjectMap[params], ch)
|
channels = append(fs.listObjectMap[params], ch)
|
||||||
}
|
}
|
||||||
@ -55,7 +70,7 @@ func (fs *Filesystem) pushListObjectCh(params ListObjectParams, ch ObjectInfoCha
|
|||||||
fs.listObjectMap[params] = channels
|
fs.listObjectMap[params] = channels
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *Filesystem) popListObjectCh(params ListObjectParams) *ObjectInfoChannel {
|
func (fs *Filesystem) popListObjectCh(params listObjectParams) *objectInfoChannel {
|
||||||
fs.listObjectMapMutex.Lock()
|
fs.listObjectMapMutex.Lock()
|
||||||
defer fs.listObjectMapMutex.Unlock()
|
defer fs.listObjectMapMutex.Unlock()
|
||||||
|
|
||||||
@ -80,40 +95,25 @@ func (fs *Filesystem) popListObjectCh(params ListObjectParams) *ObjectInfoChanne
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultipartSession holds active session information
|
|
||||||
type MultipartSession struct {
|
|
||||||
TotalParts int
|
|
||||||
ObjectName string
|
|
||||||
UploadID string
|
|
||||||
Initiated time.Time
|
|
||||||
Parts []PartMetadata
|
|
||||||
}
|
|
||||||
|
|
||||||
// Multiparts collection of many parts
|
|
||||||
type Multiparts struct {
|
|
||||||
Version string `json:"version"`
|
|
||||||
ActiveSession map[string]*MultipartSession `json:"activeSessions"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// newFS instantiate a new filesystem.
|
// newFS instantiate a new filesystem.
|
||||||
func newFS(rootPath string) (ObjectAPI, *probe.Error) {
|
func newFS(rootPath string) (ObjectAPI, *probe.Error) {
|
||||||
setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json"))
|
setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json"))
|
||||||
|
|
||||||
var err *probe.Error
|
var err *probe.Error
|
||||||
// load multiparts session from disk
|
// load multiparts session from disk
|
||||||
var multiparts *Multiparts
|
var mparts *multiparts
|
||||||
multiparts, err = loadMultipartsSession()
|
mparts, err = loadMultipartsSession()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err.ToGoError()) {
|
if os.IsNotExist(err.ToGoError()) {
|
||||||
multiparts = &Multiparts{
|
mparts = &multiparts{
|
||||||
Version: "1",
|
Version: "1",
|
||||||
ActiveSession: make(map[string]*MultipartSession),
|
ActiveSession: make(map[string]*multipartSession),
|
||||||
}
|
}
|
||||||
if err = saveMultipartsSession(*multiparts); err != nil {
|
if err = saveMultipartsSession(*mparts); err != nil {
|
||||||
return Filesystem{}, err.Trace()
|
return nil, err.Trace()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Filesystem{}, err.Trace()
|
return nil, err.Trace()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,14 +121,14 @@ func newFS(rootPath string) (ObjectAPI, *probe.Error) {
|
|||||||
rwLock: &sync.RWMutex{},
|
rwLock: &sync.RWMutex{},
|
||||||
}
|
}
|
||||||
fs.path = rootPath
|
fs.path = rootPath
|
||||||
fs.multiparts = multiparts
|
fs.multiparts = mparts
|
||||||
|
|
||||||
/// Defaults
|
/// Defaults
|
||||||
|
|
||||||
// Minium free disk required for i/o operations to succeed.
|
// Minium free disk required for i/o operations to succeed.
|
||||||
fs.minFreeDisk = 5
|
fs.minFreeDisk = 5
|
||||||
|
|
||||||
fs.listObjectMap = make(map[ListObjectParams][]ObjectInfoChannel)
|
fs.listObjectMap = make(map[listObjectParams][]objectInfoChannel)
|
||||||
fs.listObjectMapMutex = &sync.Mutex{}
|
fs.listObjectMapMutex = &sync.Mutex{}
|
||||||
|
|
||||||
// Return here.
|
// Return here.
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Minimalist Object Storage, (C) 2015, 2016 Minio, Inc.
|
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@ -59,8 +59,7 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) {
|
|||||||
uploadID, err := fs.NewMultipartUpload("bucket", "key")
|
uploadID, err := fs.NewMultipartUpload("bucket", "key")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
completedParts := CompleteMultipartUpload{}
|
completedParts := completeMultipartUpload{}
|
||||||
//completedParts.Part = make([]CompletePart, 10)
|
|
||||||
for i := 1; i <= 10; i++ {
|
for i := 1; i <= 10; i++ {
|
||||||
randomPerm := rand.Perm(10)
|
randomPerm := rand.Perm(10)
|
||||||
randomString := ""
|
randomString := ""
|
||||||
@ -76,11 +75,11 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) {
|
|||||||
calculatedMD5sum, err = fs.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex)
|
calculatedMD5sum, err = fs.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
c.Assert(calculatedMD5sum, check.Equals, expectedMD5Sumhex)
|
c.Assert(calculatedMD5sum, check.Equals, expectedMD5Sumhex)
|
||||||
completedParts.Parts = append(completedParts.Parts, CompletePart{PartNumber: i, ETag: calculatedMD5sum})
|
completedParts.Parts = append(completedParts.Parts, completePart{PartNumber: i, ETag: calculatedMD5sum})
|
||||||
}
|
}
|
||||||
objectInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
objInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
c.Assert(objectInfo.MD5Sum, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
|
c.Assert(objInfo.MD5Sum, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) {
|
func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) {
|
||||||
@ -91,6 +90,7 @@ func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) {
|
|||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
parts := make(map[int]string)
|
parts := make(map[int]string)
|
||||||
|
metadata := make(map[string]string)
|
||||||
for i := 1; i <= 10; i++ {
|
for i := 1; i <= 10; i++ {
|
||||||
randomPerm := rand.Perm(10)
|
randomPerm := rand.Perm(10)
|
||||||
randomString := ""
|
randomString := ""
|
||||||
@ -102,6 +102,7 @@ func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) {
|
|||||||
hasher.Write([]byte(randomString))
|
hasher.Write([]byte(randomString))
|
||||||
expectedMD5Sumhex := hex.EncodeToString(hasher.Sum(nil))
|
expectedMD5Sumhex := hex.EncodeToString(hasher.Sum(nil))
|
||||||
|
|
||||||
|
metadata["md5"] = expectedMD5Sumhex
|
||||||
var calculatedMD5sum string
|
var calculatedMD5sum string
|
||||||
calculatedMD5sum, err = fs.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex)
|
calculatedMD5sum, err = fs.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -130,24 +131,25 @@ func testMultipleObjectCreation(c *check.C, create func() ObjectAPI) {
|
|||||||
|
|
||||||
key := "obj" + strconv.Itoa(i)
|
key := "obj" + strconv.Itoa(i)
|
||||||
objects[key] = []byte(randomString)
|
objects[key] = []byte(randomString)
|
||||||
var objectInfo ObjectInfo
|
|
||||||
metadata := make(map[string]string)
|
metadata := make(map[string]string)
|
||||||
metadata["md5Sum"] = expectedMD5Sumhex
|
metadata["md5Sum"] = expectedMD5Sumhex
|
||||||
objectInfo, err = fs.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
|
objInfo, err := fs.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
c.Assert(objectInfo.MD5Sum, check.Equals, expectedMD5Sumhex)
|
c.Assert(objInfo.MD5Sum, check.Equals, expectedMD5Sumhex)
|
||||||
}
|
}
|
||||||
|
|
||||||
for key, value := range objects {
|
for key, value := range objects {
|
||||||
var byteBuffer bytes.Buffer
|
var byteBuffer bytes.Buffer
|
||||||
r, err := fs.GetObject("bucket", key, 0)
|
r, err := fs.GetObject("bucket", key, 0)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
io.Copy(&byteBuffer, r)
|
_, e := io.Copy(&byteBuffer, r)
|
||||||
|
c.Assert(e, check.IsNil)
|
||||||
c.Assert(byteBuffer.Bytes(), check.DeepEquals, value)
|
c.Assert(byteBuffer.Bytes(), check.DeepEquals, value)
|
||||||
|
c.Assert(r.Close(), check.IsNil)
|
||||||
|
|
||||||
metadata, err := fs.GetObjectInfo("bucket", key)
|
objInfo, err := fs.GetObjectInfo("bucket", key)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
c.Assert(metadata.Size, check.Equals, int64(len(value)))
|
c.Assert(objInfo.Size, check.Equals, int64(len(value)))
|
||||||
r.Close()
|
r.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -259,7 +261,7 @@ func testObjectOverwriteWorks(c *check.C, create func() ObjectAPI) {
|
|||||||
|
|
||||||
_, err = fs.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil)
|
_, err = fs.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
// c.Assert(md5Sum1hex, check.Equals, objectInfo.MD5Sum)
|
// c.Assert(md5Sum1hex, check.Equals, objInfo.MD5Sum)
|
||||||
|
|
||||||
_, err = fs.PutObject("bucket", "object", int64(len("three")), bytes.NewBufferString("three"), nil)
|
_, err = fs.PutObject("bucket", "object", int64(len("three")), bytes.NewBufferString("three"), nil)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -267,9 +269,10 @@ func testObjectOverwriteWorks(c *check.C, create func() ObjectAPI) {
|
|||||||
var bytesBuffer bytes.Buffer
|
var bytesBuffer bytes.Buffer
|
||||||
r, err := fs.GetObject("bucket", "object", 0)
|
r, err := fs.GetObject("bucket", "object", 0)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
io.Copy(&bytesBuffer, r)
|
_, e := io.Copy(&bytesBuffer, r)
|
||||||
|
c.Assert(e, check.IsNil)
|
||||||
c.Assert(string(bytesBuffer.Bytes()), check.Equals, "three")
|
c.Assert(string(bytesBuffer.Bytes()), check.Equals, "three")
|
||||||
r.Close()
|
c.Assert(r.Close(), check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNonExistantBucketOperations(c *check.C, create func() ObjectAPI) {
|
func testNonExistantBucketOperations(c *check.C, create func() ObjectAPI) {
|
||||||
@ -297,9 +300,11 @@ func testPutObjectInSubdir(c *check.C, create func() ObjectAPI) {
|
|||||||
var bytesBuffer bytes.Buffer
|
var bytesBuffer bytes.Buffer
|
||||||
r, err := fs.GetObject("bucket", "dir1/dir2/object", 0)
|
r, err := fs.GetObject("bucket", "dir1/dir2/object", 0)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
io.Copy(&bytesBuffer, r)
|
n, e := io.Copy(&bytesBuffer, r)
|
||||||
|
c.Assert(e, check.IsNil)
|
||||||
c.Assert(len(bytesBuffer.Bytes()), check.Equals, len("hello world"))
|
c.Assert(len(bytesBuffer.Bytes()), check.Equals, len("hello world"))
|
||||||
r.Close()
|
c.Assert(int64(len(bytesBuffer.Bytes())), check.Equals, int64(n))
|
||||||
|
c.Assert(r.Close(), check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testListBuckets(c *check.C, create func() ObjectAPI) {
|
func testListBuckets(c *check.C, create func() ObjectAPI) {
|
||||||
@ -411,6 +416,7 @@ func testDefaultContentType(c *check.C, create func() ObjectAPI) {
|
|||||||
|
|
||||||
// Test empty
|
// Test empty
|
||||||
_, err = fs.PutObject("bucket", "one", int64(len("one")), bytes.NewBufferString("one"), nil)
|
_, err = fs.PutObject("bucket", "one", int64(len("one")), bytes.NewBufferString("one"), nil)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
objInfo, err := fs.GetObjectInfo("bucket", "one")
|
objInfo, err := fs.GetObjectInfo("bucket", "one")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
c.Assert(objInfo.ContentType, check.Equals, "application/octet-stream")
|
c.Assert(objInfo.ContentType, check.Equals, "application/octet-stream")
|
||||||
|
@ -15,8 +15,8 @@ type ObjectAPI interface {
|
|||||||
GetBucketInfo(bucket string) (BucketInfo, *probe.Error)
|
GetBucketInfo(bucket string) (BucketInfo, *probe.Error)
|
||||||
|
|
||||||
// Bucket query API.
|
// Bucket query API.
|
||||||
ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error)
|
ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error)
|
||||||
ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error)
|
ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error)
|
||||||
|
|
||||||
// Object resource API.
|
// Object resource API.
|
||||||
GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error)
|
GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error)
|
||||||
@ -27,7 +27,7 @@ type ObjectAPI interface {
|
|||||||
// Object query API.
|
// Object query API.
|
||||||
NewMultipartUpload(bucket, object string) (string, *probe.Error)
|
NewMultipartUpload(bucket, object string) (string, *probe.Error)
|
||||||
PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error)
|
PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error)
|
||||||
ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error)
|
ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error)
|
||||||
CompleteMultipartUpload(bucket string, object string, uploadID string, parts []CompletePart) (ObjectInfo, *probe.Error)
|
CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error)
|
||||||
AbortMultipartUpload(bucket, object, uploadID string) *probe.Error
|
AbortMultipartUpload(bucket, object, uploadID string) *probe.Error
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@ -18,16 +18,26 @@ package main
|
|||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
// PartMetadata - various types of individual part resources
|
// BucketInfo - bucket name and create date
|
||||||
type PartMetadata struct {
|
type BucketInfo struct {
|
||||||
PartNumber int
|
Name string
|
||||||
LastModified time.Time
|
Created time.Time
|
||||||
ETag string
|
|
||||||
Size int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectResourcesMetadata - various types of object resources
|
// ObjectInfo - object info.
|
||||||
type ObjectResourcesMetadata struct {
|
type ObjectInfo struct {
|
||||||
|
Bucket string
|
||||||
|
Name string
|
||||||
|
ModifiedTime time.Time
|
||||||
|
ContentType string
|
||||||
|
MD5Sum string
|
||||||
|
Size int64
|
||||||
|
IsDir bool
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListPartsInfo - various types of object resources.
|
||||||
|
type ListPartsInfo struct {
|
||||||
Bucket string
|
Bucket string
|
||||||
Object string
|
Object string
|
||||||
UploadID string
|
UploadID string
|
||||||
@ -37,20 +47,12 @@ type ObjectResourcesMetadata struct {
|
|||||||
MaxParts int
|
MaxParts int
|
||||||
IsTruncated bool
|
IsTruncated bool
|
||||||
|
|
||||||
Part []PartMetadata
|
Parts []partInfo
|
||||||
EncodingType string
|
EncodingType string
|
||||||
}
|
}
|
||||||
|
|
||||||
// UploadMetadata container capturing metadata on in progress multipart upload in a given bucket
|
// ListMultipartsInfo - various types of bucket resources for inprogress multipart uploads.
|
||||||
type UploadMetadata struct {
|
type ListMultipartsInfo struct {
|
||||||
Object string
|
|
||||||
UploadID string
|
|
||||||
StorageClass string
|
|
||||||
Initiated time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// BucketMultipartResourcesMetadata - various types of bucket resources for inprogress multipart uploads
|
|
||||||
type BucketMultipartResourcesMetadata struct {
|
|
||||||
KeyMarker string
|
KeyMarker string
|
||||||
UploadIDMarker string
|
UploadIDMarker string
|
||||||
NextKeyMarker string
|
NextKeyMarker string
|
||||||
@ -58,34 +60,50 @@ type BucketMultipartResourcesMetadata struct {
|
|||||||
EncodingType string
|
EncodingType string
|
||||||
MaxUploads int
|
MaxUploads int
|
||||||
IsTruncated bool
|
IsTruncated bool
|
||||||
Upload []*UploadMetadata
|
Uploads []uploadMetadata
|
||||||
Prefix string
|
Prefix string
|
||||||
Delimiter string
|
Delimiter string
|
||||||
CommonPrefixes []string
|
CommonPrefixes []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectsResult - container for list object request results.
|
// ListObjectsInfo - container for list objects.
|
||||||
type ListObjectsResult struct {
|
type ListObjectsInfo struct {
|
||||||
IsTruncated bool
|
IsTruncated bool
|
||||||
NextMarker string
|
NextMarker string
|
||||||
Objects []ObjectInfo
|
Objects []ObjectInfo
|
||||||
Prefixes []string
|
Prefixes []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompletePart - completed part container
|
// partInfo - various types of individual part resources.
|
||||||
type CompletePart struct {
|
type partInfo struct {
|
||||||
|
PartNumber int
|
||||||
|
LastModified time.Time
|
||||||
|
ETag string
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadMetadata container capturing metadata on in progress multipart upload in a given bucket
|
||||||
|
type uploadMetadata struct {
|
||||||
|
Object string
|
||||||
|
UploadID string
|
||||||
|
StorageClass string
|
||||||
|
Initiated time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// completePart - completed part container.
|
||||||
|
type completePart struct {
|
||||||
PartNumber int
|
PartNumber int
|
||||||
ETag string
|
ETag string
|
||||||
}
|
}
|
||||||
|
|
||||||
// completedParts is a sortable interface for Part slice
|
// completedParts is a sortable interface for Part slice
|
||||||
type completedParts []CompletePart
|
type completedParts []completePart
|
||||||
|
|
||||||
func (a completedParts) Len() int { return len(a) }
|
func (a completedParts) Len() int { return len(a) }
|
||||||
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
||||||
|
|
||||||
// CompleteMultipartUpload container for completing multipart upload
|
// completeMultipartUpload container for completing multipart upload
|
||||||
type CompleteMultipartUpload struct {
|
type completeMultipartUpload struct {
|
||||||
Parts []CompletePart `xml:"Part"`
|
Parts []completePart `xml:"Part"`
|
||||||
}
|
}
|
@ -9,7 +9,7 @@
|
|||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieapi.ObjectAPI.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
@ -84,9 +84,8 @@ func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objectInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "GetObject failed.", nil)
|
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
case BucketNameInvalid:
|
case BucketNameInvalid:
|
||||||
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
|
||||||
@ -97,23 +96,14 @@ func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
case ObjectNameInvalid:
|
case ObjectNameInvalid:
|
||||||
writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path)
|
writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path)
|
||||||
default:
|
default:
|
||||||
|
errorIf(err.Trace(), "GetObjectInfo failed.", nil)
|
||||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var hrange *httpRange
|
|
||||||
hrange, err = getRequestedRange(r.Header.Get("Range"), objectInfo.Size)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(w, r, ErrInvalidRange, r.URL.Path)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set standard object headers.
|
|
||||||
setObjectHeaders(w, objectInfo, hrange)
|
|
||||||
|
|
||||||
// Verify 'If-Modified-Since' and 'If-Unmodified-Since'.
|
// Verify 'If-Modified-Since' and 'If-Unmodified-Since'.
|
||||||
lastModified := objectInfo.ModifiedTime
|
lastModified := objInfo.ModifiedTime
|
||||||
if checkLastModified(w, r, lastModified) {
|
if checkLastModified(w, r, lastModified) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -122,8 +112,12 @@ func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set any additional requested response headers.
|
var hrange *httpRange
|
||||||
setGetRespHeaders(w, r.URL.Query())
|
hrange, err = getRequestedRange(r.Header.Get("Range"), objInfo.Size)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(w, r, ErrInvalidRange, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Get the object.
|
// Get the object.
|
||||||
startOffset := hrange.start
|
startOffset := hrange.start
|
||||||
@ -134,6 +128,13 @@ func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer readCloser.Close() // Close after this handler returns.
|
defer readCloser.Close() // Close after this handler returns.
|
||||||
|
|
||||||
|
// Set standard object headers.
|
||||||
|
setObjectHeaders(w, objInfo, hrange)
|
||||||
|
|
||||||
|
// Set any additional requested response headers.
|
||||||
|
setGetRespHeaders(w, r.URL.Query())
|
||||||
|
|
||||||
if hrange.length > 0 {
|
if hrange.length > 0 {
|
||||||
if _, e := io.CopyN(w, readCloser, hrange.length); e != nil {
|
if _, e := io.CopyN(w, readCloser, hrange.length); e != nil {
|
||||||
errorIf(probe.NewError(e), "Writing to client failed", nil)
|
errorIf(probe.NewError(e), "Writing to client failed", nil)
|
||||||
@ -264,7 +265,7 @@ func (api objectStorageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objectInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(bucket, object), "GetObjectInfo failed.", nil)
|
errorIf(err.Trace(bucket, object), "GetObjectInfo failed.", nil)
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
@ -282,11 +283,8 @@ func (api objectStorageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set standard object headers.
|
|
||||||
setObjectHeaders(w, objectInfo, nil)
|
|
||||||
|
|
||||||
// Verify 'If-Modified-Since' and 'If-Unmodified-Since'.
|
// Verify 'If-Modified-Since' and 'If-Unmodified-Since'.
|
||||||
lastModified := objectInfo.ModifiedTime
|
lastModified := objInfo.ModifiedTime
|
||||||
if checkLastModified(w, r, lastModified) {
|
if checkLastModified(w, r, lastModified) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -296,6 +294,9 @@ func (api objectStorageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set standard object headers.
|
||||||
|
setObjectHeaders(w, objInfo, nil)
|
||||||
|
|
||||||
// Successfull response.
|
// Successfull response.
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
@ -357,7 +358,7 @@ func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
objectInfo, err := api.ObjectAPI.GetObjectInfo(sourceBucket, sourceObject)
|
objInfo, err := api.ObjectAPI.GetObjectInfo(sourceBucket, sourceObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "GetObjectInfo failed.", nil)
|
errorIf(err.Trace(), "GetObjectInfo failed.", nil)
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
@ -378,7 +379,7 @@ func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
|
|
||||||
// Verify x-amz-copy-source-if-modified-since and
|
// Verify x-amz-copy-source-if-modified-since and
|
||||||
// x-amz-copy-source-if-unmodified-since.
|
// x-amz-copy-source-if-unmodified-since.
|
||||||
lastModified := objectInfo.ModifiedTime
|
lastModified := objInfo.ModifiedTime
|
||||||
if checkCopySourceLastModified(w, r, lastModified) {
|
if checkCopySourceLastModified(w, r, lastModified) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -390,15 +391,15 @@ func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// maximum Upload size for object in a single CopyObject operation.
|
/// maximum Upload size for object in a single CopyObject operation.
|
||||||
if isMaxObjectSize(objectInfo.Size) {
|
if isMaxObjectSize(objInfo.Size) {
|
||||||
writeErrorResponse(w, r, ErrEntityTooLarge, objectSource)
|
writeErrorResponse(w, r, ErrEntityTooLarge, objectSource)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var md5Bytes []byte
|
var md5Bytes []byte
|
||||||
if objectInfo.MD5Sum != "" {
|
if objInfo.MD5Sum != "" {
|
||||||
var e error
|
var e error
|
||||||
md5Bytes, e = hex.DecodeString(objectInfo.MD5Sum)
|
md5Bytes, e = hex.DecodeString(objInfo.MD5Sum)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
errorIf(probe.NewError(e), "Decoding md5 failed.", nil)
|
errorIf(probe.NewError(e), "Decoding md5 failed.", nil)
|
||||||
writeErrorResponse(w, r, ErrInvalidDigest, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidDigest, r.URL.Path)
|
||||||
@ -421,16 +422,15 @@ func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size of object.
|
// Size of object.
|
||||||
size := objectInfo.Size
|
size := objInfo.Size
|
||||||
|
|
||||||
// Save metadata.
|
// Save metadata.
|
||||||
metadata := make(map[string]string)
|
metadata := make(map[string]string)
|
||||||
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
|
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
|
||||||
|
|
||||||
// Create the object.
|
// Create the object.
|
||||||
objectInfo, err = api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata)
|
objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "PutObject failed.", nil)
|
errorIf(err.Trace(), "PutObject failed.", nil)
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
@ -451,7 +451,7 @@ func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
response := generateCopyObjectResponse(objectInfo.MD5Sum, objectInfo.ModifiedTime)
|
response := generateCopyObjectResponse(objInfo.MD5Sum, objInfo.ModifiedTime)
|
||||||
encodedSuccessResponse := encodeResponse(response)
|
encodedSuccessResponse := encodeResponse(response)
|
||||||
// write headers
|
// write headers
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
@ -586,7 +586,7 @@ func (api objectStorageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var objectInfo ObjectInfo
|
var objInfo ObjectInfo
|
||||||
switch getRequestAuthType(r) {
|
switch getRequestAuthType(r) {
|
||||||
default:
|
default:
|
||||||
// For all unknown auth types return error.
|
// For all unknown auth types return error.
|
||||||
@ -599,7 +599,7 @@ func (api objectStorageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Create anonymous object.
|
// Create anonymous object.
|
||||||
objectInfo, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil)
|
objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil)
|
||||||
case authTypePresigned:
|
case authTypePresigned:
|
||||||
validateRegion := true // Validate region.
|
validateRegion := true // Validate region.
|
||||||
// For presigned requests verify them right here.
|
// For presigned requests verify them right here.
|
||||||
@ -608,7 +608,7 @@ func (api objectStorageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Create presigned object.
|
// Create presigned object.
|
||||||
objectInfo, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil)
|
objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil)
|
||||||
case authTypeSigned:
|
case authTypeSigned:
|
||||||
// Initialize a pipe for data pipe line.
|
// Initialize a pipe for data pipe line.
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
@ -637,10 +637,10 @@ func (api objectStorageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
|
|
||||||
// Save metadata.
|
// Save metadata.
|
||||||
metadata := make(map[string]string)
|
metadata := make(map[string]string)
|
||||||
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
|
// Make sure we hex encode here.
|
||||||
|
metadata["md5"] = hex.EncodeToString(md5Bytes)
|
||||||
// Create object.
|
// Create object.
|
||||||
objectInfo, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
|
objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "PutObject failed.", nil)
|
errorIf(err.Trace(), "PutObject failed.", nil)
|
||||||
@ -668,8 +668,8 @@ func (api objectStorageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if objectInfo.MD5Sum != "" {
|
if objInfo.MD5Sum != "" {
|
||||||
w.Header().Set("ETag", "\""+objectInfo.MD5Sum+"\"")
|
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
|
||||||
}
|
}
|
||||||
writeSuccessResponse(w, nil)
|
writeSuccessResponse(w, nil)
|
||||||
}
|
}
|
||||||
@ -868,7 +868,7 @@ func (api objectStorageAPI) AbortMultipartUploadHandler(w http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadID := getUploadID(r.URL.Query()) // Get upload id.
|
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
||||||
err := api.ObjectAPI.AbortMultipartUpload(bucket, object, uploadID)
|
err := api.ObjectAPI.AbortMultipartUpload(bucket, object, uploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "AbortMutlipartUpload failed.", nil)
|
errorIf(err.Trace(), "AbortMutlipartUpload failed.", nil)
|
||||||
@ -915,20 +915,20 @@ func (api objectStorageAPI) ListObjectPartsHandler(w http.ResponseWriter, r *htt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objectResourcesMetadata := getObjectResources(r.URL.Query())
|
uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
|
||||||
if objectResourcesMetadata.PartNumberMarker < 0 {
|
if partNumberMarker < 0 {
|
||||||
writeErrorResponse(w, r, ErrInvalidPartNumberMarker, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidPartNumberMarker, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if objectResourcesMetadata.MaxParts < 0 {
|
if maxParts < 0 {
|
||||||
writeErrorResponse(w, r, ErrInvalidMaxParts, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidMaxParts, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if objectResourcesMetadata.MaxParts == 0 {
|
if maxParts == 0 {
|
||||||
objectResourcesMetadata.MaxParts = maxPartsList
|
maxParts = maxPartsList
|
||||||
}
|
}
|
||||||
|
|
||||||
objectResourcesMetadata, err := api.ObjectAPI.ListObjectParts(bucket, object, objectResourcesMetadata)
|
listPartsInfo, err := api.ObjectAPI.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "ListObjectParts failed.", nil)
|
errorIf(err.Trace(), "ListObjectParts failed.", nil)
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
@ -947,7 +947,7 @@ func (api objectStorageAPI) ListObjectPartsHandler(w http.ResponseWriter, r *htt
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
response := generateListPartsResponse(objectResourcesMetadata)
|
response := generateListPartsResponse(listPartsInfo)
|
||||||
encodedSuccessResponse := encodeResponse(response)
|
encodedSuccessResponse := encodeResponse(response)
|
||||||
// Write headers.
|
// Write headers.
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
@ -962,8 +962,10 @@ func (api objectStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter
|
|||||||
object := vars["object"]
|
object := vars["object"]
|
||||||
|
|
||||||
// Get upload id.
|
// Get upload id.
|
||||||
uploadID := getUploadID(r.URL.Query()) // Get upload id.
|
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
||||||
|
|
||||||
|
var objInfo ObjectInfo
|
||||||
|
var err *probe.Error
|
||||||
switch getRequestAuthType(r) {
|
switch getRequestAuthType(r) {
|
||||||
default:
|
default:
|
||||||
// For all unknown auth types return error.
|
// For all unknown auth types return error.
|
||||||
@ -987,20 +989,20 @@ func (api objectStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter
|
|||||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
completeMultipartUpload := &CompleteMultipartUpload{}
|
complMultipartUpload := &completeMultipartUpload{}
|
||||||
if e = xml.Unmarshal(completeMultipartBytes, completeMultipartUpload); e != nil {
|
if e = xml.Unmarshal(completeMultipartBytes, complMultipartUpload); e != nil {
|
||||||
writeErrorResponse(w, r, ErrMalformedXML, r.URL.Path)
|
writeErrorResponse(w, r, ErrMalformedXML, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !sort.IsSorted(completedParts(completeMultipartUpload.Parts)) {
|
if !sort.IsSorted(completedParts(complMultipartUpload.Parts)) {
|
||||||
writeErrorResponse(w, r, ErrInvalidPartOrder, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidPartOrder, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Complete parts.
|
// Complete parts.
|
||||||
completeParts := completeMultipartUpload.Parts
|
completeParts := complMultipartUpload.Parts
|
||||||
|
|
||||||
// Complete multipart upload.
|
// Complete multipart upload.
|
||||||
objectInfo, err := api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
objInfo, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorIf(err.Trace(), "CompleteMultipartUpload failed.", nil)
|
errorIf(err.Trace(), "CompleteMultipartUpload failed.", nil)
|
||||||
switch err.ToGoError().(type) {
|
switch err.ToGoError().(type) {
|
||||||
@ -1020,8 +1022,6 @@ func (api objectStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter
|
|||||||
writeErrorResponse(w, r, ErrInvalidPartOrder, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidPartOrder, r.URL.Path)
|
||||||
case IncompleteBody:
|
case IncompleteBody:
|
||||||
writeErrorResponse(w, r, ErrIncompleteBody, r.URL.Path)
|
writeErrorResponse(w, r, ErrIncompleteBody, r.URL.Path)
|
||||||
case MalformedXML:
|
|
||||||
writeErrorResponse(w, r, ErrMalformedXML, r.URL.Path)
|
|
||||||
default:
|
default:
|
||||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||||
}
|
}
|
||||||
@ -1030,7 +1030,7 @@ func (api objectStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter
|
|||||||
// Get object location.
|
// Get object location.
|
||||||
location := getLocation(r)
|
location := getLocation(r)
|
||||||
// Generate complete multipart response.
|
// Generate complete multipart response.
|
||||||
response := generateCompleteMultpartUploadResponse(bucket, object, location, objectInfo.MD5Sum)
|
response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.MD5Sum)
|
||||||
encodedSuccessResponse := encodeResponse(response)
|
encodedSuccessResponse := encodeResponse(response)
|
||||||
// Write headers.
|
// Write headers.
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
|
@ -293,7 +293,6 @@ func serverMain(c *cli.Context) {
|
|||||||
printListenIPs(apiServer)
|
printListenIPs(apiServer)
|
||||||
|
|
||||||
console.Println("\nTo configure Minio Client:")
|
console.Println("\nTo configure Minio Client:")
|
||||||
|
|
||||||
// Download 'mc' links.
|
// Download 'mc' links.
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
console.Println(" Download 'mc' from https://dl.minio.io/client/mc/release/" + runtime.GOOS + "-" + runtime.GOARCH + "/mc.exe")
|
console.Println(" Download 'mc' from https://dl.minio.io/client/mc/release/" + runtime.GOOS + "-" + runtime.GOARCH + "/mc.exe")
|
||||||
|
@ -881,7 +881,7 @@ func (s *MyAPISuite) TestPartialContent(c *C) {
|
|||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
c.Assert(response.StatusCode, Equals, http.StatusOK)
|
c.Assert(response.StatusCode, Equals, http.StatusOK)
|
||||||
|
|
||||||
// prepare request
|
// Prepare request
|
||||||
request, err = s.newRequest("GET", testAPIFSCacheServer.URL+"/partial-content/bar", 0, nil)
|
request, err = s.newRequest("GET", testAPIFSCacheServer.URL+"/partial-content/bar", 0, nil)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
request.Header.Add("Range", "bytes=6-7")
|
request.Header.Add("Range", "bytes=6-7")
|
||||||
@ -1288,8 +1288,8 @@ func (s *MyAPISuite) TestObjectMultipart(c *C) {
|
|||||||
c.Assert(response2.StatusCode, Equals, http.StatusOK)
|
c.Assert(response2.StatusCode, Equals, http.StatusOK)
|
||||||
|
|
||||||
// Complete multipart upload
|
// Complete multipart upload
|
||||||
completeUploads := &CompleteMultipartUpload{
|
completeUploads := &completeMultipartUpload{
|
||||||
Parts: []CompletePart{
|
Parts: []completePart{
|
||||||
{
|
{
|
||||||
PartNumber: 1,
|
PartNumber: 1,
|
||||||
ETag: response1.Header.Get("ETag"),
|
ETag: response1.Header.Get("ETag"),
|
||||||
|
@ -139,12 +139,12 @@ func (web *webAPI) MakeBucket(r *http.Request, args *MakeBucketArgs, reply *Gene
|
|||||||
|
|
||||||
// ListBucketsRep - list buckets response
|
// ListBucketsRep - list buckets response
|
||||||
type ListBucketsRep struct {
|
type ListBucketsRep struct {
|
||||||
Buckets []BketInfo `json:"buckets"`
|
Buckets []WebBucketInfo `json:"buckets"`
|
||||||
UIVersion string `json:"uiVersion"`
|
UIVersion string `json:"uiVersion"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// BketInfo container for list buckets.
|
// WebBucketInfo container for list buckets metadata.
|
||||||
type BketInfo struct {
|
type WebBucketInfo struct {
|
||||||
// The name of the bucket.
|
// The name of the bucket.
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
// Date the bucket was created.
|
// Date the bucket was created.
|
||||||
@ -163,7 +163,7 @@ func (web *webAPI) ListBuckets(r *http.Request, args *GenericArgs, reply *ListBu
|
|||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
// List all buckets which are not private.
|
// List all buckets which are not private.
|
||||||
if bucket.Name != path.Base(reservedBucket) {
|
if bucket.Name != path.Base(reservedBucket) {
|
||||||
reply.Buckets = append(reply.Buckets, BketInfo{
|
reply.Buckets = append(reply.Buckets, WebBucketInfo{
|
||||||
Name: bucket.Name,
|
Name: bucket.Name,
|
||||||
CreationDate: bucket.Created,
|
CreationDate: bucket.Created,
|
||||||
})
|
})
|
||||||
@ -181,12 +181,12 @@ type ListObjectsArgs struct {
|
|||||||
|
|
||||||
// ListObjectsRep - list objects response.
|
// ListObjectsRep - list objects response.
|
||||||
type ListObjectsRep struct {
|
type ListObjectsRep struct {
|
||||||
Objects []ObjInfo `json:"objects"`
|
Objects []WebObjectInfo `json:"objects"`
|
||||||
UIVersion string `json:"uiVersion"`
|
UIVersion string `json:"uiVersion"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjInfo container for list objects.
|
// WebObjectInfo container for list objects metadata.
|
||||||
type ObjInfo struct {
|
type WebObjectInfo struct {
|
||||||
// Name of the object
|
// Name of the object
|
||||||
Key string `json:"name"`
|
Key string `json:"name"`
|
||||||
// Date and time the object was last modified.
|
// Date and time the object was last modified.
|
||||||
@ -210,14 +210,14 @@ func (web *webAPI) ListObjects(r *http.Request, args *ListObjectsArgs, reply *Li
|
|||||||
}
|
}
|
||||||
marker = lo.NextMarker
|
marker = lo.NextMarker
|
||||||
for _, obj := range lo.Objects {
|
for _, obj := range lo.Objects {
|
||||||
reply.Objects = append(reply.Objects, ObjInfo{
|
reply.Objects = append(reply.Objects, WebObjectInfo{
|
||||||
Key: obj.Name,
|
Key: obj.Name,
|
||||||
LastModified: obj.ModifiedTime,
|
LastModified: obj.ModifiedTime,
|
||||||
Size: obj.Size,
|
Size: obj.Size,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
for _, prefix := range lo.Prefixes {
|
for _, prefix := range lo.Prefixes {
|
||||||
reply.Objects = append(reply.Objects, ObjInfo{
|
reply.Objects = append(reply.Objects, WebObjectInfo{
|
||||||
Key: prefix,
|
Key: prefix,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user