Added ListObjectsV2 and ListObjectsV2 Anon support to Gateway S3 and Azure. (#4547)

This commit is contained in:
Nitish Tiwari 2017-06-16 22:17:00 -07:00 committed by Harshavardhana
parent b283a2c21f
commit 58833711e0
8 changed files with 276 additions and 9 deletions

View File

@ -352,9 +352,9 @@ func generateListObjectsV1Response(bucket, prefix, marker, delimiter string, max
}
// generates an ListObjectsV2 response for the said bucket with other enumerated options.
func generateListObjectsV2Response(bucket, prefix, token, startAfter, delimiter string, fetchOwner bool, maxKeys int, resp ListObjectsInfo) ListObjectsV2Response {
func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter, delimiter string, fetchOwner, isTruncated bool, maxKeys int, objects []ObjectInfo, prefixes []string) ListObjectsV2Response {
var contents []Object
var prefixes []CommonPrefix
var commonPrefixes []CommonPrefix
var owner = Owner{}
var data = ListObjectsV2Response{}
@ -363,7 +363,7 @@ func generateListObjectsV2Response(bucket, prefix, token, startAfter, delimiter
owner.DisplayName = globalMinioDefaultOwnerID
}
for _, object := range resp.Objects {
for _, object := range objects {
var content = Object{}
if object.Name == "" {
continue
@ -387,14 +387,14 @@ func generateListObjectsV2Response(bucket, prefix, token, startAfter, delimiter
data.Prefix = prefix
data.MaxKeys = maxKeys
data.ContinuationToken = token
data.NextContinuationToken = resp.NextMarker
data.IsTruncated = resp.IsTruncated
for _, prefix := range resp.Prefixes {
data.NextContinuationToken = nextToken
data.IsTruncated = isTruncated
for _, prefix := range prefixes {
var prefixItem = CommonPrefix{}
prefixItem.Prefix = prefix
prefixes = append(prefixes, prefixItem)
commonPrefixes = append(commonPrefixes, prefixItem)
}
data.CommonPrefixes = prefixes
data.CommonPrefixes = commonPrefixes
data.KeyCount = len(data.Contents) + len(data.CommonPrefixes)
return data
}

View File

@ -100,7 +100,7 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
return
}
response := generateListObjectsV2Response(bucket, prefix, token, startAfter, delimiter, fetchOwner, maxKeys, listObjectsInfo)
response := generateListObjectsV2Response(bucket, prefix, token, listObjectsInfo.NextMarker, startAfter, delimiter, fetchOwner, listObjectsInfo.IsTruncated, maxKeys, listObjectsInfo.Objects, listObjectsInfo.Prefixes)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(response))

View File

@ -190,3 +190,63 @@ func (a *azureObjects) AnonListObjects(bucket, prefix, marker, delimiter string,
result.Prefixes = listResp.BlobPrefixes
return result, nil
}
// AnonListObjectsV2 - List objects in V2 mode, anonymously
func (a *azureObjects) AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error) {
params := storage.ListBlobsParameters{
Prefix: prefix,
Marker: continuationToken,
Delimiter: delimiter,
MaxResults: uint(maxKeys),
}
q := azureListBlobsGetParameters(params)
q.Set("restype", "container")
q.Set("comp", "list")
url, err := url.Parse(a.client.GetBlobURL(bucket, ""))
if err != nil {
return result, azureToObjectError(traceError(err))
}
url.RawQuery = q.Encode()
resp, err := http.Get(url.String())
if err != nil {
return result, azureToObjectError(traceError(err))
}
defer resp.Body.Close()
var listResp storage.BlobListResponse
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return result, azureToObjectError(traceError(err))
}
err = xml.Unmarshal(data, &listResp)
if err != nil {
return result, azureToObjectError(traceError(err))
}
// If NextMarker is not empty, this means response is truncated and NextContinuationToken should be set
if listResp.NextMarker != "" {
result.IsTruncated = true
result.NextContinuationToken = listResp.NextMarker
}
for _, object := range listResp.Blobs {
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
if e != nil {
continue
}
result.Objects = append(result.Objects, ObjectInfo{
Bucket: bucket,
Name: object.Name,
ModTime: t,
Size: object.Properties.ContentLength,
ETag: canonicalizeETag(object.Properties.Etag),
ContentType: object.Properties.ContentType,
ContentEncoding: object.Properties.ContentEncoding,
})
}
result.Prefixes = listResp.BlobPrefixes
return result, nil
}

View File

@ -269,6 +269,41 @@ func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, max
return result, nil
}
// ListObjectsV2 - list all blobs in Azure bucket filtered by prefix
func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error) {
resp, err := a.client.ListBlobs(bucket, storage.ListBlobsParameters{
Prefix: prefix,
Marker: continuationToken,
Delimiter: delimiter,
MaxResults: uint(maxKeys),
})
if err != nil {
return result, azureToObjectError(traceError(err), bucket, prefix)
}
// If NextMarker is not empty, this means response is truncated and NextContinuationToken should be set
if resp.NextMarker != "" {
result.IsTruncated = true
result.NextContinuationToken = resp.NextMarker
}
for _, object := range resp.Blobs {
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
if e != nil {
continue
}
result.Objects = append(result.Objects, ObjectInfo{
Bucket: bucket,
Name: object.Name,
ModTime: t,
Size: object.Properties.ContentLength,
ETag: canonicalizeETag(object.Properties.Etag),
ContentType: object.Properties.ContentType,
ContentEncoding: object.Properties.ContentEncoding,
})
}
result.Prefixes = resp.BlobPrefixes
return result, nil
}
// GetObject - reads an object from azure. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.

View File

@ -18,6 +18,7 @@ package cmd
import (
"net/http"
"net/url"
"reflect"
"testing"
@ -155,3 +156,81 @@ func TestAzureParseBlockID(t *testing.T) {
t.Fatal("Expected azureParseBlockID() to return error")
}
}
// Test azureListBlobsGetParameters()
func TestAzureListBlobsGetParameters(t *testing.T) {
// Test values set 1
expectedURLValues := url.Values{}
expectedURLValues.Set("prefix", "test")
expectedURLValues.Set("delimiter", "_")
expectedURLValues.Set("marker", "marker")
expectedURLValues.Set("include", "hello")
expectedURLValues.Set("maxresults", "20")
expectedURLValues.Set("timeout", "10")
setBlobParameters := storage.ListBlobsParameters{"test", "_", "marker", "hello", 20, 10}
// Test values set 2
expectedURLValues1 := url.Values{}
setBlobParameters1 := storage.ListBlobsParameters{"", "", "", "", 0, 0}
testCases := []struct {
name string
args storage.ListBlobsParameters
want url.Values
}{
{"TestIfValuesSet", setBlobParameters, expectedURLValues},
{"TestIfValuesNotSet", setBlobParameters1, expectedURLValues1},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if got := azureListBlobsGetParameters(test.args); !reflect.DeepEqual(got, test.want) {
t.Errorf("azureListBlobsGetParameters() = %v, want %v", got, test.want)
}
})
}
}
func TestAnonErrToObjectErr(t *testing.T) {
testCases := []struct {
name string
statusCode int
params []string
wantErr error
}{
{"ObjectNotFound",
http.StatusNotFound,
[]string{"testBucket", "testObject"},
ObjectNotFound{Bucket: "testBucket", Object: "testObject"},
},
{"BucketNotFound",
http.StatusNotFound,
[]string{"testBucket", ""},
BucketNotFound{Bucket: "testBucket"},
},
{"ObjectNameInvalid",
http.StatusBadRequest,
[]string{"testBucket", "testObject"},
ObjectNameInvalid{Bucket: "testBucket", Object: "testObject"},
},
{"BucketNameInvalid",
http.StatusBadRequest,
[]string{"testBucket", ""},
BucketNameInvalid{Bucket: "testBucket"},
},
{"UnexpectedError",
http.StatusBadGateway,
[]string{"testBucket", "testObject"},
errUnexpected,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if err := anonErrToObjectErr(test.statusCode, test.params...); !reflect.DeepEqual(err, test.wantErr) {
t.Errorf("anonErrToObjectErr() error = %v, wantErr %v", err, test.wantErr)
}
})
}
}

View File

@ -769,6 +769,87 @@ func (api gatewayAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *htt
return
}
response := generateListObjectsV1Response(bucket, prefix, marker, delimiter, maxKeys, listObjectsInfo)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(response))
}
// ListObjectsV2Handler - GET Bucket (List Objects) Version 2.
// --------------------------
// This implementation of the GET operation returns some or all (up to 1000)
// of the objects in a bucket. You can use the request parameters as selection
// criteria to return a subset of the objects in a bucket.
//
// NOTE: It is recommended that this API to be used for application development.
// Minio continues to support ListObjectsV1 for supporting legacy tools.
func (api gatewayAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
vars := router.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
reqAuthType := getRequestAuthType(r)
switch reqAuthType {
case authTypePresignedV2, authTypeSignedV2:
// Signature V2 validation.
s3Error := isReqAuthenticatedV2(r)
if s3Error != ErrNone {
errorIf(errSignatureMismatch, dumpRequest(r))
writeErrorResponse(w, s3Error, r.URL)
return
}
case authTypeSigned, authTypePresigned:
s3Error := isReqAuthenticated(r, serverConfig.GetRegion())
if s3Error != ErrNone {
errorIf(errSignatureMismatch, dumpRequest(r))
writeErrorResponse(w, s3Error, r.URL)
return
}
case authTypeAnonymous:
// No verification needed for anonymous requests.
default:
// For all unknown auth types return error.
writeErrorResponse(w, ErrAccessDenied, r.URL)
return
}
// Extract all the listObjectsV2 query params to their native values.
prefix, token, startAfter, delimiter, fetchOwner, maxKeys, _ := getListObjectsV2Args(r.URL.Query())
// In ListObjectsV2 'continuation-token' is the marker.
marker := token
// Check if 'continuation-token' is empty.
if token == "" {
// Then we need to use 'start-after' as marker instead.
marker = startAfter
}
listObjectsV2 := objectAPI.ListObjectsV2
if reqAuthType == authTypeAnonymous {
listObjectsV2 = objectAPI.AnonListObjectsV2
}
// Validate the query params before beginning to serve the request.
// fetch-owner is not validated since it is a boolean
if s3Error := validateListObjectsArgs(prefix, marker, delimiter, maxKeys); s3Error != ErrNone {
writeErrorResponse(w, s3Error, r.URL)
return
}
// Inititate a list objects operation based on the input params.
// On success would return back ListObjectsV2Info object to be
// serialized as XML and sent as S3 compatible response body.
listObjectsV2Info, err := listObjectsV2(bucket, prefix, token, fetchOwner, delimiter, maxKeys)
if err != nil {
errorIf(err, "Unable to list objects. Args to listObjectsV2 are bucket=%s, prefix=%s, token=%s, delimiter=%s", bucket, prefix, token, delimiter)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
response := generateListObjectsV2Response(bucket, prefix, token, listObjectsV2Info.ContinuationToken, startAfter, delimiter, fetchOwner, listObjectsV2Info.IsTruncated, maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(response))

View File

@ -36,6 +36,8 @@ type GatewayLayer interface {
GetBucketPolicies(string) (policy.BucketAccessPolicy, error)
DeleteBucketPolicies(string) error
AnonListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error)
ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error)
AnonGetBucketInfo(bucket string) (bucketInfo BucketInfo, err error)
}

View File

@ -94,6 +94,16 @@ func (l *s3Objects) AnonListObjects(bucket string, prefix string, marker string,
return fromMinioClientListBucketResult(bucket, result), nil
}
// AnonListObjectsV2 - List objects in V2 mode, anonymously
func (l *s3Objects) AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) {
result, err := l.anonClient.ListObjectsV2(bucket, prefix, continuationToken, fetchOwner, delimiter, maxKeys)
if err != nil {
return ListObjectsV2Info{}, s3ToObjectError(traceError(err), bucket)
}
return fromMinioClientListBucketV2Result(bucket, result), nil
}
// AnonGetBucketInfo - Get bucket metadata anonymously.
func (l *s3Objects) AnonGetBucketInfo(bucket string) (BucketInfo, error) {
if exists, err := l.anonClient.BucketExists(bucket); err != nil {