mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
e5d378931d
with missing nextMarker with delimiter based listing, top level prefixes beyond 4500 or max-keys value wouldn't be sent back for client to ask for the next batch. reproduced at a customer deployment, create prefixes as shown below ``` for year in $(seq 2017 2020) do for month in {01..12} do for day in {01..31} do mc -q cp file myminio/testbucket/dir/day_id=$year-$month-$day/; done done done ``` Then perform ``` aws s3api --profile minio --endpoint-url http://localhost:9000 list-objects \ --bucket testbucket --prefix dir/ --delimiter / --max-keys 1000 ``` You shall see missing NextMarker, this would disallow listing beyond max-keys requested and also disallow beyond 4500 (maxKeyObjectList) prefixes being listed because client wouldn't know the NextMarker available. This PR addresses this situation properly by making the implementation more spec compatible. i.e NextMarker in-fact can be either an object, a prefix with delimiter depending on the input operation. This issue was introduced after the list caching changes and has been present for a while.
616 lines
15 KiB
Go
616 lines
15 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
)
|
|
|
|
// metaCacheEntry is an object or a directory within an unknown bucket.
|
|
type metaCacheEntry struct {
|
|
// name is the full name of the object including prefixes
|
|
name string
|
|
// Metadata. If none is present it is not an object but only a prefix.
|
|
// Entries without metadata will only be present in non-recursive scans.
|
|
metadata []byte
|
|
|
|
// cached contains the metadata if decoded.
|
|
cached *FileInfo
|
|
}
|
|
|
|
// isDir returns if the entry is representing a prefix directory.
|
|
func (e metaCacheEntry) isDir() bool {
|
|
return len(e.metadata) == 0
|
|
}
|
|
|
|
// isObject returns if the entry is representing an object.
|
|
func (e metaCacheEntry) isObject() bool {
|
|
return len(e.metadata) > 0
|
|
}
|
|
|
|
// hasPrefix returns whether an entry has a specific prefix
|
|
func (e metaCacheEntry) hasPrefix(s string) bool {
|
|
return strings.HasPrefix(e.name, s)
|
|
}
|
|
|
|
// likelyMatches returns if the entries match by comparing name and metadata length.
|
|
func (e *metaCacheEntry) likelyMatches(other *metaCacheEntry) bool {
|
|
// This should reject 99%
|
|
if len(e.metadata) != len(other.metadata) || e.name != other.name {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// matches returns if the entries match by comparing their latest version fileinfo.
|
|
func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool {
|
|
if e == nil && other == nil {
|
|
return true
|
|
}
|
|
if e == nil || other == nil {
|
|
return false
|
|
}
|
|
|
|
// This should reject 99%
|
|
if len(e.metadata) != len(other.metadata) || e.name != other.name {
|
|
return false
|
|
}
|
|
eFi, eErr := e.fileInfo(bucket)
|
|
oFi, oErr := e.fileInfo(bucket)
|
|
if eErr != nil || oErr != nil {
|
|
return eErr == oErr
|
|
}
|
|
return eFi.ModTime.Equal(oFi.ModTime) && eFi.Size == oFi.Size && eFi.VersionID == oFi.VersionID
|
|
}
|
|
|
|
// isInDir returns whether the entry is in the dir when considering the separator.
|
|
func (e metaCacheEntry) isInDir(dir, separator string) bool {
|
|
if len(dir) == 0 {
|
|
// Root
|
|
idx := strings.Index(e.name, separator)
|
|
return idx == -1 || idx == len(e.name)-len(separator)
|
|
}
|
|
ext := strings.TrimPrefix(e.name, dir)
|
|
if len(ext) != len(e.name) {
|
|
idx := strings.Index(ext, separator)
|
|
// If separator is not found or is last entry, ok.
|
|
return idx == -1 || idx == len(ext)-len(separator)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isLatestDeletemarker returns whether the latest version is a delete marker.
|
|
// If metadata is NOT versioned false will always be returned.
|
|
// If v2 and UNABLE to load metadata true will be returned.
|
|
func (e *metaCacheEntry) isLatestDeletemarker() bool {
|
|
if e.cached != nil {
|
|
return e.cached.Deleted
|
|
}
|
|
if !isXL2V1Format(e.metadata) {
|
|
return false
|
|
}
|
|
var xlMeta xlMetaV2
|
|
if err := xlMeta.Load(e.metadata); err != nil || len(xlMeta.Versions) == 0 {
|
|
return true
|
|
}
|
|
return xlMeta.Versions[len(xlMeta.Versions)-1].Type == DeleteType
|
|
}
|
|
|
|
// fileInfo returns the decoded metadata.
|
|
// If entry is a directory it is returned as that.
|
|
// If versioned the latest version will be returned.
|
|
func (e *metaCacheEntry) fileInfo(bucket string) (*FileInfo, error) {
|
|
if e.isDir() {
|
|
return &FileInfo{
|
|
Volume: bucket,
|
|
Name: e.name,
|
|
Mode: uint32(os.ModeDir),
|
|
}, nil
|
|
}
|
|
if e.cached == nil {
|
|
fi, err := getFileInfo(e.metadata, bucket, e.name, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
e.cached = &fi
|
|
}
|
|
return e.cached, nil
|
|
}
|
|
|
|
// fileInfoVersions returns the metadata as FileInfoVersions.
|
|
// If entry is a directory it is returned as that.
|
|
func (e *metaCacheEntry) fileInfoVersions(bucket string) (FileInfoVersions, error) {
|
|
if e.isDir() {
|
|
return FileInfoVersions{
|
|
Volume: bucket,
|
|
Name: e.name,
|
|
Versions: []FileInfo{
|
|
{
|
|
Volume: bucket,
|
|
Name: e.name,
|
|
Mode: uint32(os.ModeDir),
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
return getFileInfoVersions(e.metadata, bucket, e.name)
|
|
}
|
|
|
|
// metaCacheEntries is a slice of metacache entries.
|
|
type metaCacheEntries []metaCacheEntry
|
|
|
|
// less function for sorting.
|
|
func (m metaCacheEntries) less(i, j int) bool {
|
|
return m[i].name < m[j].name
|
|
}
|
|
|
|
// sort entries by name.
|
|
// m is sorted and a sorted metadata object is returned.
|
|
// Changes to m will also be reflected in the returned object.
|
|
func (m metaCacheEntries) sort() metaCacheEntriesSorted {
|
|
if m.isSorted() {
|
|
return metaCacheEntriesSorted{o: m}
|
|
}
|
|
sort.Slice(m, m.less)
|
|
return metaCacheEntriesSorted{o: m}
|
|
}
|
|
|
|
// isSorted returns whether the objects are sorted.
|
|
// This is usually orders of magnitude faster than actually sorting.
|
|
func (m metaCacheEntries) isSorted() bool {
|
|
return sort.SliceIsSorted(m, m.less)
|
|
}
|
|
|
|
// shallowClone will create a shallow clone of the array objects,
|
|
// but object metadata will not be cloned.
|
|
func (m metaCacheEntries) shallowClone() metaCacheEntries {
|
|
dst := make(metaCacheEntries, len(m))
|
|
copy(dst, m)
|
|
return dst
|
|
}
|
|
|
|
type metadataResolutionParams struct {
|
|
dirQuorum int // Number if disks needed for a directory to 'exist'.
|
|
objQuorum int // Number of disks needed for an object to 'exist'.
|
|
bucket string // Name of the bucket. Used for generating cached fileinfo.
|
|
}
|
|
|
|
func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCacheEntry, ok bool) {
|
|
if len(m) == 0 {
|
|
return nil, false
|
|
}
|
|
|
|
dirExists := 0
|
|
objExists := 0
|
|
var selFIV *FileInfo
|
|
for i := range m {
|
|
entry := &m[i]
|
|
if entry.name == "" {
|
|
continue
|
|
}
|
|
if entry.isDir() {
|
|
dirExists++
|
|
selected = entry
|
|
continue
|
|
}
|
|
|
|
// Get new entry metadata
|
|
objExists++
|
|
fiv, err := entry.fileInfo(r.bucket)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if selFIV == nil {
|
|
selected = entry
|
|
selFIV = fiv
|
|
continue
|
|
}
|
|
|
|
if selected.matches(entry, r.bucket) {
|
|
continue
|
|
}
|
|
|
|
// Select latest modtime.
|
|
if fiv.ModTime.After(selFIV.ModTime) {
|
|
selected = entry
|
|
selFIV = fiv
|
|
continue
|
|
}
|
|
}
|
|
// If directory, we need quorum.
|
|
if dirExists > 0 && dirExists < r.dirQuorum {
|
|
return nil, false
|
|
}
|
|
if objExists < r.objQuorum {
|
|
return nil, false
|
|
}
|
|
// Take the latest selected.
|
|
return selected, selected != nil
|
|
}
|
|
|
|
// firstFound returns the first found and the number of set entries.
|
|
func (m metaCacheEntries) firstFound() (first *metaCacheEntry, n int) {
|
|
for _, entry := range m {
|
|
if entry.name != "" {
|
|
n++
|
|
if first == nil {
|
|
first = &entry
|
|
}
|
|
}
|
|
}
|
|
return first, n
|
|
}
|
|
|
|
// names will return all names in order.
|
|
// Since this allocates it should not be used in critical functions.
|
|
func (m metaCacheEntries) names() []string {
|
|
res := make([]string, 0, len(m))
|
|
for _, obj := range m {
|
|
res = append(res, obj.name)
|
|
}
|
|
return res
|
|
}
|
|
|
|
// metaCacheEntriesSorted contains metacache entries that are sorted.
|
|
type metaCacheEntriesSorted struct {
|
|
o metaCacheEntries
|
|
// list id is not serialized
|
|
listID string
|
|
}
|
|
|
|
// writeTo will write all objects to the provided output.
|
|
func (m metaCacheEntriesSorted) writeTo(writer io.Writer) error {
|
|
w := newMetacacheWriter(writer, 1<<20)
|
|
if err := w.write(m.o...); err != nil {
|
|
w.Close()
|
|
return err
|
|
}
|
|
return w.Close()
|
|
}
|
|
|
|
// shallowClone will create a shallow clone of the array objects,
|
|
// but object metadata will not be cloned.
|
|
func (m metaCacheEntriesSorted) shallowClone() metaCacheEntriesSorted {
|
|
// We have value receiver so we already have a copy.
|
|
m.o = m.o.shallowClone()
|
|
return m
|
|
}
|
|
|
|
// iterate the entries in order.
|
|
// If the iterator function returns iterating stops.
|
|
func (m *metaCacheEntriesSorted) iterate(fn func(entry metaCacheEntry) (cont bool)) {
|
|
if m == nil {
|
|
return
|
|
}
|
|
for _, o := range m.o {
|
|
if !fn(o) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// fileInfoVersions converts the metadata to FileInfoVersions where possible.
|
|
// Metadata that cannot be decoded is skipped.
|
|
func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, afterV string) (versions []ObjectInfo) {
|
|
versions = make([]ObjectInfo, 0, m.len())
|
|
prevPrefix := ""
|
|
for _, entry := range m.o {
|
|
if entry.isObject() {
|
|
if delimiter != "" {
|
|
idx := strings.Index(strings.TrimPrefix(entry.name, prefix), delimiter)
|
|
if idx >= 0 {
|
|
idx = len(prefix) + idx + len(delimiter)
|
|
currPrefix := entry.name[:idx]
|
|
if currPrefix == prevPrefix {
|
|
continue
|
|
}
|
|
prevPrefix = currPrefix
|
|
versions = append(versions, ObjectInfo{
|
|
IsDir: true,
|
|
Bucket: bucket,
|
|
Name: currPrefix,
|
|
})
|
|
continue
|
|
}
|
|
}
|
|
|
|
fiv, err := entry.fileInfoVersions(bucket)
|
|
if afterV != "" {
|
|
// Forward first entry to specified version
|
|
fiv.forwardPastVersion(afterV)
|
|
afterV = ""
|
|
}
|
|
if err == nil {
|
|
for _, version := range fiv.Versions {
|
|
versions = append(versions, version.ToObjectInfo(bucket, entry.name))
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
if entry.isDir() {
|
|
if delimiter == "" {
|
|
continue
|
|
}
|
|
idx := strings.Index(strings.TrimPrefix(entry.name, prefix), delimiter)
|
|
if idx < 0 {
|
|
continue
|
|
}
|
|
idx = len(prefix) + idx + len(delimiter)
|
|
currPrefix := entry.name[:idx]
|
|
if currPrefix == prevPrefix {
|
|
continue
|
|
}
|
|
prevPrefix = currPrefix
|
|
versions = append(versions, ObjectInfo{
|
|
IsDir: true,
|
|
Bucket: bucket,
|
|
Name: currPrefix,
|
|
})
|
|
}
|
|
}
|
|
|
|
return versions
|
|
}
|
|
|
|
// fileInfoVersions converts the metadata to FileInfoVersions where possible.
|
|
// Metadata that cannot be decoded is skipped.
|
|
func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (objects []ObjectInfo) {
|
|
objects = make([]ObjectInfo, 0, m.len())
|
|
prevPrefix := ""
|
|
for _, entry := range m.o {
|
|
if entry.isObject() {
|
|
if delimiter != "" {
|
|
idx := strings.Index(strings.TrimPrefix(entry.name, prefix), delimiter)
|
|
if idx >= 0 {
|
|
idx = len(prefix) + idx + len(delimiter)
|
|
currPrefix := entry.name[:idx]
|
|
if currPrefix == prevPrefix {
|
|
continue
|
|
}
|
|
prevPrefix = currPrefix
|
|
objects = append(objects, ObjectInfo{
|
|
IsDir: true,
|
|
Bucket: bucket,
|
|
Name: currPrefix,
|
|
})
|
|
continue
|
|
}
|
|
}
|
|
|
|
fi, err := entry.fileInfo(bucket)
|
|
if err == nil {
|
|
objects = append(objects, fi.ToObjectInfo(bucket, entry.name))
|
|
}
|
|
continue
|
|
}
|
|
if entry.isDir() {
|
|
if delimiter == "" {
|
|
continue
|
|
}
|
|
idx := strings.Index(strings.TrimPrefix(entry.name, prefix), delimiter)
|
|
if idx < 0 {
|
|
continue
|
|
}
|
|
idx = len(prefix) + idx + len(delimiter)
|
|
currPrefix := entry.name[:idx]
|
|
if currPrefix == prevPrefix {
|
|
continue
|
|
}
|
|
prevPrefix = currPrefix
|
|
objects = append(objects, ObjectInfo{
|
|
IsDir: true,
|
|
Bucket: bucket,
|
|
Name: currPrefix,
|
|
})
|
|
}
|
|
}
|
|
|
|
return objects
|
|
}
|
|
|
|
// forwardTo will truncate m so only entries that are s or after is in the list.
|
|
func (m *metaCacheEntriesSorted) forwardTo(s string) {
|
|
if s == "" {
|
|
return
|
|
}
|
|
idx := sort.Search(len(m.o), func(i int) bool {
|
|
return m.o[i].name >= s
|
|
})
|
|
m.o = m.o[idx:]
|
|
}
|
|
|
|
// merge will merge other into m.
|
|
// If the same entries exists in both and metadata matches only one is added,
|
|
// otherwise the entry from m will be placed first.
|
|
// Operation time is expected to be O(n+m).
|
|
func (m *metaCacheEntriesSorted) merge(other metaCacheEntriesSorted, limit int) {
|
|
merged := make(metaCacheEntries, 0, m.len()+other.len())
|
|
a := m.entries()
|
|
b := other.entries()
|
|
for len(a) > 0 && len(b) > 0 {
|
|
if a[0].name == b[0].name && bytes.Equal(a[0].metadata, b[0].metadata) {
|
|
// Same, discard one.
|
|
merged = append(merged, a[0])
|
|
a = a[1:]
|
|
b = b[1:]
|
|
} else if a[0].name < b[0].name {
|
|
merged = append(merged, a[0])
|
|
a = a[1:]
|
|
} else {
|
|
merged = append(merged, b[0])
|
|
b = b[1:]
|
|
}
|
|
if limit > 0 && len(merged) >= limit {
|
|
break
|
|
}
|
|
}
|
|
// Append anything left.
|
|
if limit < 0 || len(merged) < limit {
|
|
merged = append(merged, a...)
|
|
merged = append(merged, b...)
|
|
}
|
|
m.o = merged
|
|
}
|
|
|
|
// filter allows selective filtering with the provided function.
|
|
func (m *metaCacheEntriesSorted) filter(fn func(entry *metaCacheEntry) bool) {
|
|
dst := m.o[:0]
|
|
for _, o := range m.o {
|
|
if fn(&o) {
|
|
dst = append(dst, o)
|
|
}
|
|
}
|
|
m.o = dst
|
|
}
|
|
|
|
// filterPrefix will filter m to only contain entries with the specified prefix.
|
|
func (m *metaCacheEntriesSorted) filterPrefix(s string) {
|
|
if s == "" {
|
|
return
|
|
}
|
|
m.forwardTo(s)
|
|
for i, o := range m.o {
|
|
if !o.hasPrefix(s) {
|
|
m.o = m.o[:i]
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// filterObjectsOnly will remove prefix directories.
|
|
// Order is preserved, but the underlying slice is modified.
|
|
func (m *metaCacheEntriesSorted) filterObjectsOnly() {
|
|
dst := m.o[:0]
|
|
for _, o := range m.o {
|
|
if !o.isDir() {
|
|
dst = append(dst, o)
|
|
}
|
|
}
|
|
m.o = dst
|
|
}
|
|
|
|
// filterPrefixesOnly will remove objects.
|
|
// Order is preserved, but the underlying slice is modified.
|
|
func (m *metaCacheEntriesSorted) filterPrefixesOnly() {
|
|
dst := m.o[:0]
|
|
for _, o := range m.o {
|
|
if o.isDir() {
|
|
dst = append(dst, o)
|
|
}
|
|
}
|
|
m.o = dst
|
|
}
|
|
|
|
// filterRecursiveEntries will keep entries only with the prefix that doesn't contain separator.
|
|
// This can be used to remove recursive listings.
|
|
// To return root elements only set prefix to an empty string.
|
|
// Order is preserved, but the underlying slice is modified.
|
|
func (m *metaCacheEntriesSorted) filterRecursiveEntries(prefix, separator string) {
|
|
dst := m.o[:0]
|
|
if prefix != "" {
|
|
m.forwardTo(prefix)
|
|
for _, o := range m.o {
|
|
ext := strings.TrimPrefix(o.name, prefix)
|
|
if len(ext) != len(o.name) {
|
|
if !strings.Contains(ext, separator) {
|
|
dst = append(dst, o)
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// No prefix, simpler
|
|
for _, o := range m.o {
|
|
if !strings.Contains(o.name, separator) {
|
|
dst = append(dst, o)
|
|
}
|
|
}
|
|
}
|
|
m.o = dst
|
|
}
|
|
|
|
// truncate the number of entries to maximum n.
|
|
func (m *metaCacheEntriesSorted) truncate(n int) {
|
|
if m == nil {
|
|
return
|
|
}
|
|
if len(m.o) > n {
|
|
m.o = m.o[:n]
|
|
}
|
|
}
|
|
|
|
// len returns the number of objects and prefix dirs in m.
|
|
func (m *metaCacheEntriesSorted) len() int {
|
|
if m == nil {
|
|
return 0
|
|
}
|
|
return len(m.o)
|
|
}
|
|
|
|
// entries returns the underlying objects as is currently represented.
|
|
func (m *metaCacheEntriesSorted) entries() metaCacheEntries {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
return m.o
|
|
}
|
|
|
|
// deduplicate entries in the list.
|
|
// If compareMeta is set it will be used to resolve conflicts.
|
|
// The function should return whether the existing entry should be replaced with other.
|
|
// If no compareMeta is provided duplicates may be left.
|
|
// This is indicated by the returned boolean.
|
|
func (m *metaCacheEntriesSorted) deduplicate(compareMeta func(existing, other *metaCacheEntry) (replace bool)) (dupesLeft bool) {
|
|
dst := m.o[:0]
|
|
for j := range m.o {
|
|
found := false
|
|
obj := &m.o[j]
|
|
for i := len(dst) - 1; i >= 0; i++ {
|
|
existing := &dst[i]
|
|
if existing.name != obj.name {
|
|
break
|
|
}
|
|
|
|
// Use given resolution function first if any.
|
|
if compareMeta != nil {
|
|
if compareMeta(existing, obj) {
|
|
dst[i] = *obj
|
|
}
|
|
found = true
|
|
break
|
|
}
|
|
if obj.likelyMatches(existing) {
|
|
found = true
|
|
break
|
|
}
|
|
|
|
// Matches, move on.
|
|
dupesLeft = true
|
|
continue
|
|
}
|
|
if !found {
|
|
dst = append(dst, *obj)
|
|
}
|
|
}
|
|
m.o = dst
|
|
return dupesLeft
|
|
}
|