Enhance ListObjectsV2 API to return UserDefined metadata (#8539)

This commit is contained in:
Harshavardhana 2019-11-21 01:54:49 -08:00 committed by GitHub
parent 4da68cfcfc
commit 9565641b9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 141 additions and 6 deletions

View File

@ -239,6 +239,37 @@ type ObjectVersion struct {
IsLatest bool IsLatest bool
} }
// StringMap is a map[string]string.
type StringMap map[string]string
// MarshalXML - StringMap marshals into XML.
func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
tokens := []xml.Token{start}
for key, value := range s {
t := xml.StartElement{}
t.Name = xml.Name{
Space: "",
Local: key,
}
tokens = append(tokens, t, xml.CharData(value), xml.EndElement{Name: t.Name})
}
tokens = append(tokens, xml.EndElement{
Name: start.Name,
})
for _, t := range tokens {
if err := e.EncodeToken(t); err != nil {
return err
}
}
// flush to ensure tokens are written
return e.Flush()
}
// Object container for object metadata // Object container for object metadata
type Object struct { type Object struct {
Key string Key string
@ -251,6 +282,9 @@ type Object struct {
// The class of storage used to store the object. // The class of storage used to store the object.
StorageClass string StorageClass string
// UserMetadata user-defined metadata
UserMetadata StringMap `xml:"UserMetadata,omitempty"`
} }
// CopyObjectResponse container returns ETag and LastModified of the successfully copied object // CopyObjectResponse container returns ETag and LastModified of the successfully copied object
@ -466,7 +500,7 @@ func generateListObjectsV1Response(bucket, prefix, marker, delimiter, encodingTy
} }
// generates an ListObjectsV2 response for the said bucket with other enumerated options. // generates an ListObjectsV2 response for the said bucket with other enumerated options.
func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter, delimiter, encodingType string, fetchOwner, isTruncated bool, maxKeys int, objects []ObjectInfo, prefixes []string) ListObjectsV2Response { func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter, delimiter, encodingType string, fetchOwner, isTruncated bool, maxKeys int, objects []ObjectInfo, prefixes []string, metadata bool) ListObjectsV2Response {
var contents []Object var contents []Object
var commonPrefixes []CommonPrefix var commonPrefixes []CommonPrefix
var owner = Owner{} var owner = Owner{}
@ -489,6 +523,17 @@ func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter,
content.Size = object.Size content.Size = object.Size
content.StorageClass = object.StorageClass content.StorageClass = object.StorageClass
content.Owner = owner content.Owner = owner
if metadata {
content.UserMetadata = make(StringMap)
for k, v := range CleanMinioInternalMetadataKeys(object.UserDefined) {
if hasPrefix(k, ReservedMetadataPrefix) {
// Do not need to send any internal metadata
// values to client.
continue
}
content.UserMetadata[k] = v
}
}
contents = append(contents, content) contents = append(contents, content)
} }
data.Name = bucket data.Name = bucket

View File

@ -162,6 +162,8 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}") bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}")
// ListMultipartUploads // ListMultipartUploads
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler))).Queries("uploads", "") bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler))).Queries("uploads", "")
// ListObjectsV2M
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listobjectsv2M", httpTraceAll(api.ListObjectsV2MHandler))).Queries("list-type", "2", "metadata", "true")
// ListObjectsV2 // ListObjectsV2
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listobjectsv2", httpTraceAll(api.ListObjectsV2Handler))).Queries("list-type", "2") bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listobjectsv2", httpTraceAll(api.ListObjectsV2Handler))).Queries("list-type", "2")
// ListBucketVersions // ListBucketVersions

View File

@ -126,6 +126,90 @@ func (api objectAPIHandlers) ListBucketObjectVersionsHandler(w http.ResponseWrit
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))
} }
// ListObjectsV2MHandler - GET Bucket (List Objects) Version 2 with metadata.
// --------------------------
// This implementation of the GET operation returns some or all (up to 10000)
// of the objects in a bucket. You can use the request parame<ters as selection
// criteria to return a subset of the objects in a bucket.
//
// NOTE: It is recommended that this API to be used for application development.
// MinIO continues to support ListObjectsV1 and V2 for supporting legacy tools.
func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ListObjectsV2M")
defer logger.AuditLog(w, r, "ListObjectsV2M", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.ListBucketAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
urlValues := r.URL.Query()
// Extract all the listObjectsV2 query params to their native values.
prefix, token, startAfter, delimiter, fetchOwner, maxKeys, encodingType, errCode := getListObjectsV2Args(urlValues)
if errCode != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(errCode), r.URL, guessIsBrowserReq(r))
return
}
// Validate the query params before beginning to serve the request.
// fetch-owner is not validated since it is a boolean
if s3Error := validateListObjectsArgs(token, delimiter, encodingType, maxKeys); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
listObjectsV2 := objectAPI.ListObjectsV2
// Inititate a list objects operation based on the input params.
// On success would return back ListObjectsInfo object to be
// marshaled into S3 compatible XML header.
listObjectsV2Info, err := listObjectsV2(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
for i := range listObjectsV2Info.Objects {
var actualSize int64
if listObjectsV2Info.Objects[i].IsCompressed() {
// Read the decompressed size from the meta.json.
actualSize = listObjectsV2Info.Objects[i].GetActualSize()
if actualSize < 0 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDecompressedSize), r.URL, guessIsBrowserReq(r))
return
}
// Set the info.Size to the actualSize.
listObjectsV2Info.Objects[i].Size = actualSize
} else if crypto.IsEncrypted(listObjectsV2Info.Objects[i].UserDefined) {
listObjectsV2Info.Objects[i].ETag = getDecryptedETag(r.Header, listObjectsV2Info.Objects[i], false)
listObjectsV2Info.Objects[i].Size, err = listObjectsV2Info.Objects[i].DecryptedSize()
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
}
response := generateListObjectsV2Response(bucket, prefix, token,
listObjectsV2Info.NextContinuationToken, startAfter,
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated,
maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, true)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(response))
}
// ListObjectsV2Handler - GET Bucket (List Objects) Version 2. // ListObjectsV2Handler - GET Bucket (List Objects) Version 2.
// -------------------------- // --------------------------
// This implementation of the GET operation returns some or all (up to 10000) // This implementation of the GET operation returns some or all (up to 10000)
@ -201,8 +285,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
} }
} }
response := generateListObjectsV2Response(bucket, prefix, token, listObjectsV2Info.NextContinuationToken, startAfter, response := generateListObjectsV2Response(bucket, prefix, token,
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated, maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes) listObjectsV2Info.NextContinuationToken, startAfter,
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated,
maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, false)
// Write success response. // Write success response.
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))

View File

@ -1060,9 +1060,11 @@ func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWri
configFile := path.Join(bucketConfigPrefix, bucket, bucketObjectLockEnabledConfigFile) configFile := path.Join(bucketConfigPrefix, bucket, bucketObjectLockEnabledConfigFile)
configData, err := readConfig(ctx, objectAPI, configFile) configData, err := readConfig(ctx, objectAPI, configFile)
if err != nil { if err != nil {
aerr := toAPIError(ctx, err) var aerr APIError
if err == errConfigNotFound { if err == errConfigNotFound {
aerr = errorCodes.ToAPIErr(ErrMethodNotAllowed) aerr = errorCodes.ToAPIErr(ErrMethodNotAllowed)
} else {
aerr = toAPIError(ctx, err)
} }
writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r))
return return

View File

@ -102,7 +102,7 @@ func isHTTPHeaderSizeTooLarge(header http.Header) bool {
length := len(key) + len(header.Get(key)) length := len(key) + len(header.Get(key))
size += length size += length
for _, prefix := range userMetadataKeyPrefixes { for _, prefix := range userMetadataKeyPrefixes {
if strings.HasPrefix(key, prefix) { if hasPrefix(key, prefix) {
usersize += length usersize += length
break break
} }
@ -141,7 +141,7 @@ func (h reservedMetadataHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
// and must not set by clients // and must not set by clients
func containsReservedMetadata(header http.Header) bool { func containsReservedMetadata(header http.Header) bool {
for key := range header { for key := range header {
if strings.HasPrefix(key, ReservedMetadataPrefix) { if hasPrefix(key, ReservedMetadataPrefix) {
return true return true
} }
} }