mirror of
https://github.com/minio/minio.git
synced 2025-01-24 05:03:16 -05:00
fix: walk missing entries with opts.Marker set (#19661)
'opts.Marker` is causing many missed entries if used since results are returned unsorted. Also since pools are serialized. Switch to do fully concurrent listing and merging across pools to return sorted entries. Returning errors on listings is impossible with the current API, so document that. Return an error at once if no drives are found instead of just returning an empty listing and no error.
This commit is contained in:
parent
1526e7ece3
commit
4afb59e63f
@ -2034,178 +2034,185 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
|
||||
}
|
||||
|
||||
// Walk a bucket, optionally prefix recursively, until we have returned
|
||||
// all the content to objectInfo channel, it is callers responsibility
|
||||
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
||||
// error walker returns error. Optionally if context.Done() is received
|
||||
// then Walk() stops the walker.
|
||||
// all the contents of the provided bucket+prefix.
|
||||
// TODO: Note that most errors will result in a truncated listing.
|
||||
func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil {
|
||||
// Upon error close the channel.
|
||||
xioutil.SafeClose(results)
|
||||
return err
|
||||
}
|
||||
|
||||
vcfg, _ := globalBucketVersioningSys.Get(bucket)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
var entries []chan metaCacheEntry
|
||||
|
||||
for poolIdx, erasureSet := range z.serverPools {
|
||||
for setIdx, set := range erasureSet.sets {
|
||||
set := set
|
||||
listOut := make(chan metaCacheEntry, 1)
|
||||
entries = append(entries, listOut)
|
||||
disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true)
|
||||
if len(disks) == 0 {
|
||||
xioutil.SafeClose(results)
|
||||
cancel()
|
||||
return fmt.Errorf("Walk: no online disks found in pool %d, set %d", setIdx, poolIdx)
|
||||
}
|
||||
go func() {
|
||||
defer xioutil.SafeClose(listOut)
|
||||
send := func(e metaCacheEntry) {
|
||||
if e.isDir() {
|
||||
// Ignore directories.
|
||||
return
|
||||
}
|
||||
select {
|
||||
case listOut <- e:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
askDisks := getListQuorum(opts.AskDisks, set.setDriveCount)
|
||||
if askDisks == -1 {
|
||||
newDisks := getQuorumDisks(disks, infos, (len(disks)+1)/2)
|
||||
if newDisks != nil {
|
||||
// If we found disks signature in quorum, we proceed to list
|
||||
// from a single drive, shuffling of the drives is subsequently.
|
||||
disks = newDisks
|
||||
askDisks = 1
|
||||
} else {
|
||||
// If we did not find suitable disks, perform strict quorum listing
|
||||
// as no disk agrees on quorum anymore.
|
||||
askDisks = getListQuorum("strict", set.setDriveCount)
|
||||
}
|
||||
}
|
||||
|
||||
// Special case: ask all disks if the drive count is 4
|
||||
if set.setDriveCount == 4 || askDisks > len(disks) {
|
||||
askDisks = len(disks) // use all available drives
|
||||
}
|
||||
|
||||
var fallbackDisks []StorageAPI
|
||||
if askDisks > 0 && len(disks) > askDisks {
|
||||
rand.Shuffle(len(disks), func(i, j int) {
|
||||
disks[i], disks[j] = disks[j], disks[i]
|
||||
})
|
||||
fallbackDisks = disks[askDisks:]
|
||||
disks = disks[:askDisks]
|
||||
}
|
||||
|
||||
requestedVersions := 0
|
||||
if opts.LatestOnly {
|
||||
requestedVersions = 1
|
||||
}
|
||||
|
||||
// However many we ask, versions must exist on ~50%
|
||||
listingQuorum := (askDisks + 1) / 2
|
||||
|
||||
// How to resolve partial results.
|
||||
resolver := metadataResolutionParams{
|
||||
dirQuorum: listingQuorum,
|
||||
objQuorum: listingQuorum,
|
||||
bucket: bucket,
|
||||
requestedVersions: requestedVersions,
|
||||
}
|
||||
|
||||
path := baseDirFromPrefix(prefix)
|
||||
filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator)
|
||||
if path == prefix {
|
||||
filterPrefix = ""
|
||||
}
|
||||
|
||||
lopts := listPathRawOptions{
|
||||
disks: disks,
|
||||
fallbackDisks: fallbackDisks,
|
||||
bucket: bucket,
|
||||
path: path,
|
||||
filterPrefix: filterPrefix,
|
||||
recursive: true,
|
||||
forwardTo: opts.Marker,
|
||||
minDisks: listingQuorum,
|
||||
reportNotFound: false,
|
||||
agreed: send,
|
||||
partial: func(entries metaCacheEntries, _ []error) {
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if ok {
|
||||
send(*entry)
|
||||
}
|
||||
},
|
||||
finished: nil,
|
||||
}
|
||||
|
||||
if err := listPathRaw(ctx, lopts); err != nil {
|
||||
storageLogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Convert and filter merged entries.
|
||||
merged := make(chan metaCacheEntry, 100)
|
||||
vcfg, _ := globalBucketVersioningSys.Get(bucket)
|
||||
go func() {
|
||||
defer cancel()
|
||||
defer xioutil.SafeClose(results)
|
||||
|
||||
for _, erasureSet := range z.serverPools {
|
||||
var wg sync.WaitGroup
|
||||
for _, set := range erasureSet.sets {
|
||||
set := set
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true)
|
||||
if len(disks) == 0 {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
send := func(objInfo ObjectInfo) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case results <- objInfo:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
askDisks := getListQuorum(opts.AskDisks, set.setDriveCount)
|
||||
if askDisks == -1 {
|
||||
newDisks := getQuorumDisks(disks, infos, (len(disks)+1)/2)
|
||||
if newDisks != nil {
|
||||
// If we found disks signature in quorum, we proceed to list
|
||||
// from a single drive, shuffling of the drives is subsequently.
|
||||
disks = newDisks
|
||||
askDisks = 1
|
||||
} else {
|
||||
// If we did not find suitable disks, perform strict quorum listing
|
||||
// as no disk agrees on quorum anymore.
|
||||
askDisks = getListQuorum("strict", set.setDriveCount)
|
||||
}
|
||||
}
|
||||
|
||||
// Special case: ask all disks if the drive count is 4
|
||||
if set.setDriveCount == 4 || askDisks > len(disks) {
|
||||
askDisks = len(disks) // use all available drives
|
||||
}
|
||||
|
||||
var fallbackDisks []StorageAPI
|
||||
if askDisks > 0 && len(disks) > askDisks {
|
||||
rand.Shuffle(len(disks), func(i, j int) {
|
||||
disks[i], disks[j] = disks[j], disks[i]
|
||||
})
|
||||
fallbackDisks = disks[askDisks:]
|
||||
disks = disks[:askDisks]
|
||||
}
|
||||
|
||||
requestedVersions := 0
|
||||
if opts.LatestOnly {
|
||||
requestedVersions = 1
|
||||
}
|
||||
loadEntry := func(entry metaCacheEntry) {
|
||||
if entry.isDir() {
|
||||
send := func(oi ObjectInfo) bool {
|
||||
select {
|
||||
case results <- oi:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
for entry := range merged {
|
||||
if opts.LatestOnly {
|
||||
fi, err := entry.fileInfo(bucket)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if opts.Filter != nil {
|
||||
if opts.Filter(fi) {
|
||||
if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) {
|
||||
return
|
||||
}
|
||||
|
||||
if opts.LatestOnly {
|
||||
fi, err := entry.fileInfo(bucket)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if opts.Filter != nil {
|
||||
if opts.Filter(fi) {
|
||||
if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) {
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
fivs, err := entry.fileInfoVersions(bucket)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
// Note: entry.fileInfoVersions returns versions sorted in reverse chronological order based on ModTime
|
||||
if opts.VersionsSort == WalkVersionsSortAsc {
|
||||
versionsSorter(fivs.Versions).reverse()
|
||||
}
|
||||
|
||||
for _, version := range fivs.Versions {
|
||||
if opts.Filter != nil {
|
||||
if opts.Filter(version) {
|
||||
if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) {
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// However many we ask, versions must exist on ~50%
|
||||
listingQuorum := (askDisks + 1) / 2
|
||||
|
||||
// How to resolve partial results.
|
||||
resolver := metadataResolutionParams{
|
||||
dirQuorum: listingQuorum,
|
||||
objQuorum: listingQuorum,
|
||||
bucket: bucket,
|
||||
requestedVersions: requestedVersions,
|
||||
}
|
||||
|
||||
path := baseDirFromPrefix(prefix)
|
||||
filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator)
|
||||
if path == prefix {
|
||||
filterPrefix = ""
|
||||
}
|
||||
|
||||
lopts := listPathRawOptions{
|
||||
disks: disks,
|
||||
fallbackDisks: fallbackDisks,
|
||||
bucket: bucket,
|
||||
path: path,
|
||||
filterPrefix: filterPrefix,
|
||||
recursive: true,
|
||||
forwardTo: opts.Marker,
|
||||
minDisks: 1,
|
||||
reportNotFound: false,
|
||||
agreed: loadEntry,
|
||||
partial: func(entries metaCacheEntries, _ []error) {
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if ok {
|
||||
loadEntry(*entry)
|
||||
}
|
||||
},
|
||||
finished: nil,
|
||||
}
|
||||
|
||||
if err := listPathRaw(ctx, lopts); err != nil {
|
||||
storageLogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
|
||||
cancel()
|
||||
} else {
|
||||
if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) {
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
continue
|
||||
}
|
||||
fivs, err := entry.fileInfoVersions(bucket)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Note: entry.fileInfoVersions returns versions sorted in reverse chronological order based on ModTime
|
||||
if opts.VersionsSort == WalkVersionsSortAsc {
|
||||
versionsSorter(fivs.Versions).reverse()
|
||||
}
|
||||
|
||||
for _, version := range fivs.Versions {
|
||||
if opts.Filter != nil {
|
||||
if opts.Filter(version) {
|
||||
if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) {
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
// Merge all entries from all disks.
|
||||
// We leave quorum at 1, since entries are already resolved to have the desired quorum.
|
||||
// mergeEntryChannels will close 'merged' channel upon completion or cancellation.
|
||||
storageLogIf(ctx, mergeEntryChannels(ctx, entries, merged, 1))
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -217,6 +217,27 @@ func prepareErasure(ctx context.Context, nDisks int) (ObjectLayer, []string, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Wait up to 10 seconds for disks to come online.
|
||||
pools := obj.(*erasureServerPools)
|
||||
t := time.Now()
|
||||
for _, pool := range pools.serverPools {
|
||||
for _, sets := range pool.erasureDisks {
|
||||
for _, s := range sets {
|
||||
if !s.IsLocal() {
|
||||
for {
|
||||
if s.IsOnline() {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if time.Since(t) > 10*time.Second {
|
||||
return nil, nil, errors.New("timeout waiting for disk to come online")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return obj, fsDirs, nil
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user