From 7752b03add5dd415ae738c50396967dfe2e49b86 Mon Sep 17 00:00:00 2001 From: Poorna Date: Mon, 13 May 2024 07:57:42 -0700 Subject: [PATCH] optimize max-keys=2 listing for spark workloads (#19725) to return results appropriately for versioned buckets, especially when underlying prefixes have been deleted --- cmd/erasure-server-pool.go | 201 ++++++++++++++++++++++--------------- 1 file changed, 120 insertions(+), 81 deletions(-) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 6616618c9..d178192f0 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1460,8 +1460,7 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int { return maxKeys } -func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, v1 bool) (ListObjectsInfo, error) { - var loi ListObjectsInfo +func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, v1 bool) (loi ListObjectsInfo, err error) { opts := listPathOptions{ V1: v1, Bucket: bucket, @@ -1473,7 +1472,69 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre AskDisks: globalAPIConfig.getListQuorum(), } opts.setBucketMeta(ctx) + listFn := func(ctx context.Context, opts listPathOptions, limitTo int) (ListObjectsInfo, error) { + var loi ListObjectsInfo + merged, err := z.listPath(ctx, &opts) + if err != nil && err != io.EOF { + if !isErrBucketNotFound(err) { + storageLogOnceIf(ctx, err, "erasure-list-objects-path-"+bucket) + } + return loi, toObjectErr(err, bucket) + } + merged.forwardPast(opts.Marker) + defer merged.truncate(0) // Release when returning + if contextCanceled(ctx) { + return ListObjectsInfo{}, ctx.Err() + } + + // Default is recursive, if delimiter is set then list non recursive. + objects := merged.fileInfos(bucket, prefix, delimiter) + loi.IsTruncated = err == nil && len(objects) > 0 + if limitTo > 0 && len(objects) > limitTo { + objects = objects[:limitTo] + loi.IsTruncated = true + } + for _, obj := range objects { + if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" { + // Only add each once. + // With slash delimiter we only get the directory once. + found := false + if delimiter != slashSeparator { + for _, p := range loi.Prefixes { + if found { + break + } + found = p == obj.Name + } + } + if !found { + loi.Prefixes = append(loi.Prefixes, obj.Name) + } + } else { + loi.Objects = append(loi.Objects, obj) + } + } + if loi.IsTruncated { + last := objects[len(objects)-1] + loi.NextMarker = last.Name + } + + if merged.lastSkippedEntry != "" { + if merged.lastSkippedEntry > loi.NextMarker { + // An object hidden by ILM was found during listing. Since the number of entries + // fetched from drives is limited, set IsTruncated to true to ask the s3 client + // to continue listing if it wishes in order to find if there is more objects. + loi.IsTruncated = true + loi.NextMarker = merged.lastSkippedEntry + } + } + + if loi.NextMarker != "" { + loi.NextMarker = opts.encodeMarker(loi.NextMarker) + } + return loi, nil + } ri := logger.GetReqInfo(ctx) hadoop := ri != nil && strings.Contains(ri.UserAgent, `Hadoop `) && strings.Contains(ri.UserAgent, "scala/") matches := func() bool { @@ -1500,7 +1561,8 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre } return false } - if hadoop && matches() && delimiter == SlashSeparator && maxKeys == 2 && marker == "" { + + if hadoop && delimiter == SlashSeparator && maxKeys == 2 && marker == "" { // Optimization for Spark/Hadoop workload where spark sends a garbage // request of this kind // @@ -1537,26 +1599,65 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre // df.write.parquet("s3a://testbucket/parquet/") // } // } - objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true}) - if err == nil { - if opts.Lifecycle != nil { - evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo) - if evt.Action.Delete() { - globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects) - if !evt.Action.DeleteRestored() { - // Skip entry if ILM action was DeleteVersionAction or DeleteAction - return loi, nil + if matches() { + objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true}) + if err == nil { + if opts.Lifecycle != nil { + evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo) + if evt.Action.Delete() { + globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects) + if !evt.Action.DeleteRestored() { + // Skip entry if ILM action was DeleteVersionAction or DeleteAction + return loi, nil + } } } + return loi, nil + } + if isErrBucketNotFound(err) { + return loi, err + } + if contextCanceled(ctx) { + return ListObjectsInfo{}, ctx.Err() + } + } + // Hadoop makes the max-keys=2 listing call just to find if the directory is empty or not, or in the case + // of an object to check for object existence. For versioned buckets, MinIO's non-recursive + // call will report top level prefixes in deleted state, whereas spark/hadoop interpret this as non-empty + // and throw a 404 exception. This is especially a problem for spark jobs overwriting the same partition + // repeatedly. This workaround recursively lists the top 3 entries including delete markers to reflect the + // correct state of the directory in the list results. + opts.Recursive = true + opts.InclDeleted = true + opts.Limit = maxKeys + 1 + li, err := listFn(ctx, opts, opts.Limit) + if err == nil { + switch { + case len(li.Objects) == 0 && len(li.Prefixes) == 0: + return loi, nil + case len(li.Objects) > 0 || len(li.Prefixes) > 0: + var o ObjectInfo + var pfx string + if len(li.Objects) > 0 { + o = li.Objects[0] + p := strings.TrimPrefix(o.Name, opts.Prefix) + if p != "" { + sidx := strings.Index(p, "/") + if sidx > 0 { + pfx = p[:sidx] + } + } + } + if o.DeleteMarker { + loi.Objects = append(loi.Objects, ObjectInfo{Bucket: bucket, IsDir: true, Name: prefix}) + return loi, nil + } else if len(li.Objects) == 1 { + loi.Objects = append(loi.Objects, o) + loi.Prefixes = append(loi.Prefixes, path.Join(opts.Prefix, pfx)) + } } return loi, nil } - if isErrBucketNotFound(err) { - return loi, err - } - if contextCanceled(ctx) { - return ListObjectsInfo{}, ctx.Err() - } } if len(prefix) > 0 && maxKeys == 1 && marker == "" { @@ -1589,69 +1690,7 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre return ListObjectsInfo{}, ctx.Err() } } - - merged, err := z.listPath(ctx, &opts) - if err != nil && err != io.EOF { - if !isErrBucketNotFound(err) { - storageLogOnceIf(ctx, err, "erasure-list-objects-path-"+bucket) - } - return loi, toObjectErr(err, bucket) - } - - merged.forwardPast(opts.Marker) - defer merged.truncate(0) // Release when returning - - if contextCanceled(ctx) { - return ListObjectsInfo{}, ctx.Err() - } - - // Default is recursive, if delimiter is set then list non recursive. - objects := merged.fileInfos(bucket, prefix, delimiter) - loi.IsTruncated = err == nil && len(objects) > 0 - if maxKeys > 0 && len(objects) > maxKeys { - objects = objects[:maxKeys] - loi.IsTruncated = true - } - for _, obj := range objects { - if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" { - // Only add each once. - // With slash delimiter we only get the directory once. - found := false - if delimiter != slashSeparator { - for _, p := range loi.Prefixes { - if found { - break - } - found = p == obj.Name - } - } - if !found { - loi.Prefixes = append(loi.Prefixes, obj.Name) - } - } else { - loi.Objects = append(loi.Objects, obj) - } - } - if loi.IsTruncated { - last := objects[len(objects)-1] - loi.NextMarker = last.Name - } - - if merged.lastSkippedEntry != "" { - if merged.lastSkippedEntry > loi.NextMarker { - // An object hidden by ILM was found during listing. Since the number of entries - // fetched from drives is limited, set IsTruncated to true to ask the s3 client - // to continue listing if it wishes in order to find if there is more objects. - loi.IsTruncated = true - loi.NextMarker = merged.lastSkippedEntry - } - } - - if loi.NextMarker != "" { - loi.NextMarker = opts.encodeMarker(loi.NextMarker) - } - - return loi, nil + return listFn(ctx, opts, maxKeys) } func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {