gcs: Use Pager to iterate results in ListObjectsV1/V2 (#6162)

Fixes #6052
This commit is contained in:
wd256 2018-07-19 02:19:16 +10:00 committed by Nitish Tiwari
parent 6c93c60424
commit 3ec4738955
1 changed files with 103 additions and 99 deletions

View File

@ -554,16 +554,16 @@ func isGCSMarker(marker string) bool {
// ListObjects - lists all blobs in GCS bucket filtered by prefix
func (l *gcsGateway) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (minio.ListObjectsInfo, error) {
if maxKeys == 0 {
return minio.ListObjectsInfo{}, nil
}
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{
Delimiter: delimiter,
Prefix: prefix,
Versions: false,
})
isTruncated := false
nextMarker := ""
prefixes := []string{}
// To accommodate S3-compatible applications using
// ListObjectsV1 to use object keys as markers to control the
// listing of objects, we use the following encoding scheme to
@ -574,45 +574,40 @@ func (l *gcsGateway) ListObjects(ctx context.Context, bucket string, prefix stri
// prefixing "{minio}" to the GCS continuation token,
// e.g, "{minio}CgRvYmoz"
//
// - Application supplied markers are used as-is to list
// object keys that appear after it in the lexicographical order.
// - Application supplied markers are transformed to a
// GCS continuation token.
// If application is using GCS continuation token we should
// strip the gcsTokenPrefix we added.
gcsMarker := isGCSMarker(marker)
if gcsMarker {
it.PageInfo().Token = strings.TrimPrefix(marker, gcsTokenPrefix)
token := ""
if marker != "" {
if isGCSMarker(marker) {
token = strings.TrimPrefix(marker, gcsTokenPrefix)
} else {
token = toGCSPageToken(marker)
}
}
nextMarker := ""
it.PageInfo().MaxSize = maxKeys
var prefixes []string
var objects []minio.ObjectInfo
var nextPageToken string
var err error
objects := []minio.ObjectInfo{}
pager := iterator.NewPager(it, maxKeys, token)
for {
if len(objects) >= maxKeys {
// check if there is one next object and
// if that one next object is our hidden
// metadata folder, then just break
// otherwise we've truncated the output
attrs, _ := it.Next()
if attrs != nil && attrs.Prefix == minio.GatewayMinioSysTmp {
break
}
isTruncated = true
break
}
attrs, err := it.Next()
if err == iterator.Done {
break
}
gcsObjects := make([]*storage.ObjectAttrs, 0)
nextPageToken, err = pager.NextPage(&gcsObjects)
if err != nil {
logger.LogIf(ctx, err)
return minio.ListObjectsInfo{}, gcsToObjectError(err, bucket, prefix)
}
nextMarker = toGCSPageToken(attrs.Name)
for _, attrs := range gcsObjects {
// Due to minio.GatewayMinioSysTmp keys being skipped, the number of objects + prefixes
// returned may not total maxKeys. This behavior is compatible with the S3 spec which
// allows the response to include less keys than maxKeys.
if attrs.Prefix == minio.GatewayMinioSysTmp {
// We don't return our metadata prefix.
continue
@ -626,31 +621,39 @@ func (l *gcsGateway) ListObjects(ctx context.Context, bucket string, prefix stri
continue
}
}
if attrs.Prefix != "" {
prefixes = append(prefixes, attrs.Prefix)
continue
}
if !gcsMarker && attrs.Name <= marker {
// if user supplied a marker don't append
// objects until we reach marker (and skip it).
continue
} else {
objects = append(objects, fromGCSAttrsToObjectInfo(attrs))
}
objects = append(objects, minio.ObjectInfo{
Name: attrs.Name,
Bucket: attrs.Bucket,
ModTime: attrs.Updated,
Size: attrs.Size,
ETag: minio.ToS3ETag(fmt.Sprintf("%d", attrs.CRC32C)),
UserDefined: attrs.Metadata,
ContentType: attrs.ContentType,
ContentEncoding: attrs.ContentEncoding,
})
// The NextMarker property should only be set in the response if a delimiter is used
if delimiter != "" {
if attrs.Prefix > nextMarker {
nextMarker = attrs.Prefix
} else if attrs.Name > nextMarker {
nextMarker = attrs.Name
}
}
}
// Exit the loop if at least one item can be returned from
// the current page or there are no more pages available
if nextPageToken == "" || len(prefixes)+len(objects) > 0 {
break
}
}
if nextPageToken == "" {
nextMarker = ""
} else if nextMarker != "" {
nextMarker = gcsTokenPrefix + toGCSPageToken(nextMarker)
}
return minio.ListObjectsInfo{
IsTruncated: isTruncated,
NextMarker: gcsTokenPrefix + nextMarker,
IsTruncated: nextPageToken != "",
NextMarker: nextMarker,
Prefixes: prefixes,
Objects: objects,
}, nil
@ -658,8 +661,8 @@ func (l *gcsGateway) ListObjects(ctx context.Context, bucket string, prefix stri
// ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix
func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (minio.ListObjectsV2Info, error) {
if continuationToken == "" && startAfter != "" {
continuationToken = toGCSPageToken(startAfter)
if maxKeys == 0 {
return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, nil
}
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{
@ -668,35 +671,30 @@ func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continua
Versions: false,
})
isTruncated := false
if continuationToken != "" {
// If client sends continuationToken, set it
it.PageInfo().Token = continuationToken
} else {
// else set the continuationToken to return
continuationToken = it.PageInfo().Token
if continuationToken != "" {
// If GCS SDK sets continuationToken, it means there are more than maxKeys in the current page
// and the response will be truncated
isTruncated = true
}
token := continuationToken
if token == "" && startAfter != "" {
token = toGCSPageToken(startAfter)
}
var prefixes []string
var objects []minio.ObjectInfo
var nextPageToken string
var err error
for keyCount := 0; keyCount < maxKeys; keyCount++ {
attrs, err := it.Next()
if err == iterator.Done {
break
}
pager := iterator.NewPager(it, maxKeys, token)
for {
gcsObjects := make([]*storage.ObjectAttrs, 0)
nextPageToken, err = pager.NextPage(&gcsObjects)
if err != nil {
logger.LogIf(ctx, err)
return minio.ListObjectsV2Info{}, gcsToObjectError(err, bucket, prefix)
}
for _, attrs := range gcsObjects {
// Due to minio.GatewayMinioSysTmp keys being skipped, the number of objects + prefixes
// returned may not total maxKeys. This behavior is compatible with the S3 spec which
// allows the response to include less keys than maxKeys.
if attrs.Prefix == minio.GatewayMinioSysTmp {
// We don't return our metadata prefix.
continue
@ -713,16 +711,22 @@ func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continua
if attrs.Prefix != "" {
prefixes = append(prefixes, attrs.Prefix)
continue
} else {
objects = append(objects, fromGCSAttrsToObjectInfo(attrs))
}
}
objects = append(objects, fromGCSAttrsToObjectInfo(attrs))
// Exit the loop if at least one item can be returned from
// the current page or there are no more pages available
if nextPageToken == "" || len(prefixes)+len(objects) > 0 {
break
}
}
return minio.ListObjectsV2Info{
IsTruncated: isTruncated,
IsTruncated: nextPageToken != "",
ContinuationToken: continuationToken,
NextContinuationToken: continuationToken,
NextContinuationToken: nextPageToken,
Prefixes: prefixes,
Objects: objects,
}, nil