mirror of
https://github.com/minio/minio.git
synced 2025-03-26 15:30:57 -04:00
optimize max-keys=2 listing for spark workloads (#19725)
to return results appropriately for versioned buckets, especially when underlying prefixes have been deleted
This commit is contained in:
parent
01bfc78535
commit
7752b03add
@ -1460,8 +1460,7 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int {
|
||||
return maxKeys
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, v1 bool) (ListObjectsInfo, error) {
|
||||
var loi ListObjectsInfo
|
||||
func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, v1 bool) (loi ListObjectsInfo, err error) {
|
||||
opts := listPathOptions{
|
||||
V1: v1,
|
||||
Bucket: bucket,
|
||||
@ -1473,7 +1472,69 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
|
||||
AskDisks: globalAPIConfig.getListQuorum(),
|
||||
}
|
||||
opts.setBucketMeta(ctx)
|
||||
listFn := func(ctx context.Context, opts listPathOptions, limitTo int) (ListObjectsInfo, error) {
|
||||
var loi ListObjectsInfo
|
||||
merged, err := z.listPath(ctx, &opts)
|
||||
if err != nil && err != io.EOF {
|
||||
if !isErrBucketNotFound(err) {
|
||||
storageLogOnceIf(ctx, err, "erasure-list-objects-path-"+bucket)
|
||||
}
|
||||
return loi, toObjectErr(err, bucket)
|
||||
}
|
||||
merged.forwardPast(opts.Marker)
|
||||
defer merged.truncate(0) // Release when returning
|
||||
|
||||
if contextCanceled(ctx) {
|
||||
return ListObjectsInfo{}, ctx.Err()
|
||||
}
|
||||
|
||||
// Default is recursive, if delimiter is set then list non recursive.
|
||||
objects := merged.fileInfos(bucket, prefix, delimiter)
|
||||
loi.IsTruncated = err == nil && len(objects) > 0
|
||||
if limitTo > 0 && len(objects) > limitTo {
|
||||
objects = objects[:limitTo]
|
||||
loi.IsTruncated = true
|
||||
}
|
||||
for _, obj := range objects {
|
||||
if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" {
|
||||
// Only add each once.
|
||||
// With slash delimiter we only get the directory once.
|
||||
found := false
|
||||
if delimiter != slashSeparator {
|
||||
for _, p := range loi.Prefixes {
|
||||
if found {
|
||||
break
|
||||
}
|
||||
found = p == obj.Name
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
loi.Prefixes = append(loi.Prefixes, obj.Name)
|
||||
}
|
||||
} else {
|
||||
loi.Objects = append(loi.Objects, obj)
|
||||
}
|
||||
}
|
||||
if loi.IsTruncated {
|
||||
last := objects[len(objects)-1]
|
||||
loi.NextMarker = last.Name
|
||||
}
|
||||
|
||||
if merged.lastSkippedEntry != "" {
|
||||
if merged.lastSkippedEntry > loi.NextMarker {
|
||||
// An object hidden by ILM was found during listing. Since the number of entries
|
||||
// fetched from drives is limited, set IsTruncated to true to ask the s3 client
|
||||
// to continue listing if it wishes in order to find if there is more objects.
|
||||
loi.IsTruncated = true
|
||||
loi.NextMarker = merged.lastSkippedEntry
|
||||
}
|
||||
}
|
||||
|
||||
if loi.NextMarker != "" {
|
||||
loi.NextMarker = opts.encodeMarker(loi.NextMarker)
|
||||
}
|
||||
return loi, nil
|
||||
}
|
||||
ri := logger.GetReqInfo(ctx)
|
||||
hadoop := ri != nil && strings.Contains(ri.UserAgent, `Hadoop `) && strings.Contains(ri.UserAgent, "scala/")
|
||||
matches := func() bool {
|
||||
@ -1500,7 +1561,8 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
|
||||
}
|
||||
return false
|
||||
}
|
||||
if hadoop && matches() && delimiter == SlashSeparator && maxKeys == 2 && marker == "" {
|
||||
|
||||
if hadoop && delimiter == SlashSeparator && maxKeys == 2 && marker == "" {
|
||||
// Optimization for Spark/Hadoop workload where spark sends a garbage
|
||||
// request of this kind
|
||||
//
|
||||
@ -1537,26 +1599,65 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
|
||||
// df.write.parquet("s3a://testbucket/parquet/")
|
||||
// }
|
||||
// }
|
||||
objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true})
|
||||
if err == nil {
|
||||
if opts.Lifecycle != nil {
|
||||
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo)
|
||||
if evt.Action.Delete() {
|
||||
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
|
||||
if !evt.Action.DeleteRestored() {
|
||||
// Skip entry if ILM action was DeleteVersionAction or DeleteAction
|
||||
return loi, nil
|
||||
if matches() {
|
||||
objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true})
|
||||
if err == nil {
|
||||
if opts.Lifecycle != nil {
|
||||
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo)
|
||||
if evt.Action.Delete() {
|
||||
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
|
||||
if !evt.Action.DeleteRestored() {
|
||||
// Skip entry if ILM action was DeleteVersionAction or DeleteAction
|
||||
return loi, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return loi, nil
|
||||
}
|
||||
if isErrBucketNotFound(err) {
|
||||
return loi, err
|
||||
}
|
||||
if contextCanceled(ctx) {
|
||||
return ListObjectsInfo{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
// Hadoop makes the max-keys=2 listing call just to find if the directory is empty or not, or in the case
|
||||
// of an object to check for object existence. For versioned buckets, MinIO's non-recursive
|
||||
// call will report top level prefixes in deleted state, whereas spark/hadoop interpret this as non-empty
|
||||
// and throw a 404 exception. This is especially a problem for spark jobs overwriting the same partition
|
||||
// repeatedly. This workaround recursively lists the top 3 entries including delete markers to reflect the
|
||||
// correct state of the directory in the list results.
|
||||
opts.Recursive = true
|
||||
opts.InclDeleted = true
|
||||
opts.Limit = maxKeys + 1
|
||||
li, err := listFn(ctx, opts, opts.Limit)
|
||||
if err == nil {
|
||||
switch {
|
||||
case len(li.Objects) == 0 && len(li.Prefixes) == 0:
|
||||
return loi, nil
|
||||
case len(li.Objects) > 0 || len(li.Prefixes) > 0:
|
||||
var o ObjectInfo
|
||||
var pfx string
|
||||
if len(li.Objects) > 0 {
|
||||
o = li.Objects[0]
|
||||
p := strings.TrimPrefix(o.Name, opts.Prefix)
|
||||
if p != "" {
|
||||
sidx := strings.Index(p, "/")
|
||||
if sidx > 0 {
|
||||
pfx = p[:sidx]
|
||||
}
|
||||
}
|
||||
}
|
||||
if o.DeleteMarker {
|
||||
loi.Objects = append(loi.Objects, ObjectInfo{Bucket: bucket, IsDir: true, Name: prefix})
|
||||
return loi, nil
|
||||
} else if len(li.Objects) == 1 {
|
||||
loi.Objects = append(loi.Objects, o)
|
||||
loi.Prefixes = append(loi.Prefixes, path.Join(opts.Prefix, pfx))
|
||||
}
|
||||
}
|
||||
return loi, nil
|
||||
}
|
||||
if isErrBucketNotFound(err) {
|
||||
return loi, err
|
||||
}
|
||||
if contextCanceled(ctx) {
|
||||
return ListObjectsInfo{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
if len(prefix) > 0 && maxKeys == 1 && marker == "" {
|
||||
@ -1589,69 +1690,7 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
|
||||
return ListObjectsInfo{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
merged, err := z.listPath(ctx, &opts)
|
||||
if err != nil && err != io.EOF {
|
||||
if !isErrBucketNotFound(err) {
|
||||
storageLogOnceIf(ctx, err, "erasure-list-objects-path-"+bucket)
|
||||
}
|
||||
return loi, toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
merged.forwardPast(opts.Marker)
|
||||
defer merged.truncate(0) // Release when returning
|
||||
|
||||
if contextCanceled(ctx) {
|
||||
return ListObjectsInfo{}, ctx.Err()
|
||||
}
|
||||
|
||||
// Default is recursive, if delimiter is set then list non recursive.
|
||||
objects := merged.fileInfos(bucket, prefix, delimiter)
|
||||
loi.IsTruncated = err == nil && len(objects) > 0
|
||||
if maxKeys > 0 && len(objects) > maxKeys {
|
||||
objects = objects[:maxKeys]
|
||||
loi.IsTruncated = true
|
||||
}
|
||||
for _, obj := range objects {
|
||||
if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" {
|
||||
// Only add each once.
|
||||
// With slash delimiter we only get the directory once.
|
||||
found := false
|
||||
if delimiter != slashSeparator {
|
||||
for _, p := range loi.Prefixes {
|
||||
if found {
|
||||
break
|
||||
}
|
||||
found = p == obj.Name
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
loi.Prefixes = append(loi.Prefixes, obj.Name)
|
||||
}
|
||||
} else {
|
||||
loi.Objects = append(loi.Objects, obj)
|
||||
}
|
||||
}
|
||||
if loi.IsTruncated {
|
||||
last := objects[len(objects)-1]
|
||||
loi.NextMarker = last.Name
|
||||
}
|
||||
|
||||
if merged.lastSkippedEntry != "" {
|
||||
if merged.lastSkippedEntry > loi.NextMarker {
|
||||
// An object hidden by ILM was found during listing. Since the number of entries
|
||||
// fetched from drives is limited, set IsTruncated to true to ask the s3 client
|
||||
// to continue listing if it wishes in order to find if there is more objects.
|
||||
loi.IsTruncated = true
|
||||
loi.NextMarker = merged.lastSkippedEntry
|
||||
}
|
||||
}
|
||||
|
||||
if loi.NextMarker != "" {
|
||||
loi.NextMarker = opts.encodeMarker(loi.NextMarker)
|
||||
}
|
||||
|
||||
return loi, nil
|
||||
return listFn(ctx, opts, maxKeys)
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user