fix: hide prefixes for Hadoop properly (#19821)

This commit is contained in:
Harshavardhana 2024-05-28 15:53:15 -07:00 committed by GitHub
parent 2f64d5f77e
commit 64baedf5a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1535,7 +1535,7 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
return loi, nil return loi, nil
} }
ri := logger.GetReqInfo(ctx) ri := logger.GetReqInfo(ctx)
hadoop := ri != nil && strings.Contains(ri.UserAgent, `Hadoop `) && strings.Contains(ri.UserAgent, "scala/") hadoop := ri != nil && strings.Contains(ri.UserAgent, "Hadoop ") && strings.Contains(ri.UserAgent, "scala/")
matches := func() bool { matches := func() bool {
if prefix == "" { if prefix == "" {
return false return false
@ -1600,7 +1600,7 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
// } // }
if matches() { if matches() {
objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true}) objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true})
if err == nil { if err == nil || objInfo.IsLatest && objInfo.DeleteMarker {
if opts.Lifecycle != nil { if opts.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo) evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo)
if evt.Action.Delete() { if evt.Action.Delete() {
@ -1626,41 +1626,29 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
// and throw a 404 exception. This is especially a problem for spark jobs overwriting the same partition // and throw a 404 exception. This is especially a problem for spark jobs overwriting the same partition
// repeatedly. This workaround recursively lists the top 3 entries including delete markers to reflect the // repeatedly. This workaround recursively lists the top 3 entries including delete markers to reflect the
// correct state of the directory in the list results. // correct state of the directory in the list results.
opts.Recursive = true if strings.HasSuffix(opts.Prefix, SlashSeparator) {
opts.InclDeleted = true li, err := listFn(ctx, opts, maxKeys)
opts.Limit = maxKeys + 1 if err != nil {
li, err := listFn(ctx, opts, opts.Limit) return loi, err
if err == nil {
switch {
case len(li.Objects) == 0 && len(li.Prefixes) == 0:
return loi, nil
case len(li.Objects) > 0 || len(li.Prefixes) > 0:
var o ObjectInfo
var pfx string
if len(li.Objects) > 0 {
o = li.Objects[0]
p := strings.TrimPrefix(o.Name, opts.Prefix)
if p != "" {
sidx := strings.Index(p, "/")
if sidx > 0 {
pfx = p[:sidx]
}
}
}
switch {
case o.DeleteMarker:
loi.Objects = append(loi.Objects, ObjectInfo{Bucket: bucket, IsDir: true, Name: prefix})
return loi, nil
case len(li.Objects) >= 1:
loi.Objects = append(loi.Objects, o)
if pfx != "" {
loi.Prefixes = append(loi.Prefixes, path.Join(opts.Prefix, pfx))
}
case len(li.Prefixes) > 0:
loi.Prefixes = append(loi.Prefixes, li.Prefixes...)
}
} }
return loi, nil if len(li.Objects) == 0 {
prefixes := li.Prefixes[:0]
for _, prefix := range li.Prefixes {
objInfo, _ := z.GetObjectInfo(ctx, bucket, pathJoin(prefix, "_SUCCESS"), ObjectOptions{NoLock: true})
if objInfo.IsLatest && objInfo.DeleteMarker {
continue
}
prefixes = append(prefixes, prefix)
}
if len(prefixes) > 0 {
objInfo, _ := z.GetObjectInfo(ctx, bucket, pathJoin(opts.Prefix, "_SUCCESS"), ObjectOptions{NoLock: true})
if objInfo.IsLatest && objInfo.DeleteMarker {
return loi, nil
}
}
li.Prefixes = prefixes
}
return li, nil
} }
} }