fix: listObjectVersions Include object in marker (#11562)

ListObjectVersions would skip past the object in the marker when version id is specified. 
Make `listPath` return the object with the marker and truncate it if not needed.

Avoid having to parse unintended objects to find a version marker.
This commit is contained in:
Klaus Post 2021-03-01 08:12:02 -08:00 committed by GitHub
parent 289b22d911
commit 10bdb78699
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 19 deletions

View File

@ -823,7 +823,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
Bucket: bucket, Bucket: bucket,
Prefix: prefix, Prefix: prefix,
Separator: delimiter, Separator: delimiter,
Limit: maxKeys, Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker, Marker: marker,
InclDeleted: true, InclDeleted: true,
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
@ -843,6 +843,11 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return loi, err return loi, err
} }
if versionMarker == "" {
// If we are not looking for a specific version skip it.
marker, _ = parseMarker(marker)
merged.forwardPast(marker)
}
objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker) objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker)
loi.IsTruncated = err == nil && len(objects) > 0 loi.IsTruncated = err == nil && len(objects) > 0
if maxKeys > 0 && len(objects) > maxKeys { if maxKeys > 0 && len(objects) > maxKeys {
@ -864,6 +869,16 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
return loi, nil return loi, nil
} }
func maxKeysPlusOne(maxKeys int, addOne bool) int {
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
if addOne {
maxKeys++
}
return maxKeys
}
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var loi ListObjectsInfo var loi ListObjectsInfo
@ -871,7 +886,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
Bucket: bucket, Bucket: bucket,
Prefix: prefix, Prefix: prefix,
Separator: delimiter, Separator: delimiter,
Limit: maxKeys, Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker, Marker: marker,
InclDeleted: false, InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
@ -880,6 +895,8 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return loi, err return loi, err
} }
marker, _ = parseMarker(marker)
merged.forwardPast(marker)
// Default is recursive, if delimiter is set then list non recursive. // Default is recursive, if delimiter is set then list non recursive.
objects := merged.fileInfos(bucket, prefix, delimiter) objects := merged.fileInfos(bucket, prefix, delimiter)

View File

@ -442,6 +442,17 @@ func (m *metaCacheEntriesSorted) forwardTo(s string) {
m.o = m.o[idx:] m.o = m.o[idx:]
} }
// forwardPast will truncate m so only entries that are after s is in the list.
func (m *metaCacheEntriesSorted) forwardPast(s string) {
if s == "" {
return
}
idx := sort.Search(len(m.o), func(i int) bool {
return m.o[i].name > s
})
m.o = m.o[idx:]
}
// merge will merge other into m. // merge will merge other into m.
// If the same entries exists in both and metadata matches only one is added, // If the same entries exists in both and metadata matches only one is added,
// otherwise the entry from m will be placed first. // otherwise the entry from m will be placed first.

View File

@ -82,11 +82,6 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
return entries, io.EOF return entries, io.EOF
} }
// Over flowing count - reset to maxObjectList.
if o.Limit < 0 || o.Limit > maxObjectList {
o.Limit = maxObjectList
}
// If delimiter is slashSeparator we must return directories of // If delimiter is slashSeparator we must return directories of
// the non-recursive scan unless explicitly requested. // the non-recursive scan unless explicitly requested.
o.IncludeDirectories = o.Separator == slashSeparator o.IncludeDirectories = o.Separator == slashSeparator

View File

@ -56,7 +56,7 @@ type listPathOptions struct {
FilterPrefix string FilterPrefix string
// Marker to resume listing. // Marker to resume listing.
// The response will be the first entry AFTER this object name. // The response will be the first entry >= this object name.
Marker string Marker string
// Limit the number of results. // Limit the number of results.
@ -157,7 +157,7 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa
continue continue
} }
o.debugln("gather got:", entry.name) o.debugln("gather got:", entry.name)
if o.Marker != "" && entry.name <= o.Marker { if o.Marker != "" && entry.name < o.Marker {
o.debugln("pre marker") o.debugln("pre marker")
continue continue
} }
@ -311,16 +311,6 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor
if err != nil { if err != nil {
return entries, err return entries, err
} }
next, err := r.peek()
if err != nil {
return entries, err
}
if next.name == o.Marker {
err := r.skip(1)
if err != nil {
return entries, err
}
}
} }
o.debugln("forwarded to ", o.Prefix, "marker:", o.Marker, "sep:", o.Separator) o.debugln("forwarded to ", o.Prefix, "marker:", o.Marker, "sep:", o.Separator)