mirror of
https://github.com/minio/minio.git
synced 2025-04-01 02:03:42 -04:00
optimize listObjects to list only from 3 random disks (#9184)
This commit is contained in:
parent
cfc9cfd84a
commit
da04cb91ce
@ -887,8 +887,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
|
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
|
||||||
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
|
func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, ndisks int) (entries FilesInfo) {
|
||||||
func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, partialQuorumOnly bool) (entries FilesInfo) {
|
|
||||||
var i = 0
|
var i = 0
|
||||||
entriesInfos := make([]FileInfo, len(entryChs))
|
entriesInfos := make([]FileInfo, len(entryChs))
|
||||||
entriesValid := make([]bool, len(entryChs))
|
entriesValid := make([]bool, len(entryChs))
|
||||||
@ -899,27 +898,11 @@ func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, partial
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
rquorum := fi.Quorum
|
if quorumCount < ndisks-1 {
|
||||||
// Quorum is zero for all directories.
|
// Skip entries which are not found on upto ndisks.
|
||||||
if rquorum == 0 {
|
continue
|
||||||
// Choose N/2 quoroum for directory entries.
|
|
||||||
rquorum = totalDrives / 2
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if partialQuorumOnly {
|
|
||||||
// When healing is enabled, we should
|
|
||||||
// list only objects which need healing.
|
|
||||||
if quorumCount == totalDrives {
|
|
||||||
// Skip good entries.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Regular listing, we skip entries not in quorum.
|
|
||||||
if quorumCount < rquorum {
|
|
||||||
// Skip entries which do not have quorum.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
entries.Files = append(entries.Files, fi)
|
entries.Files = append(entries.Files, fi)
|
||||||
i++
|
i++
|
||||||
if i == maxKeys {
|
if i == maxKeys {
|
||||||
@ -951,11 +934,19 @@ func isTruncated(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
|
|||||||
return isTruncated
|
return isTruncated
|
||||||
}
|
}
|
||||||
|
|
||||||
// Starts a walk channel across all disks and returns a slice.
|
|
||||||
func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh {
|
func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh {
|
||||||
|
return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Starts a walk channel across all disks and returns a slice of
|
||||||
|
// FileInfo channels which can be read from.
|
||||||
|
func (s *xlSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh {
|
||||||
var entryChs []FileInfoCh
|
var entryChs []FileInfoCh
|
||||||
|
var success int
|
||||||
for _, set := range s.sets {
|
for _, set := range s.sets {
|
||||||
for _, disk := range set.getDisks() {
|
// Reset for the next erasure set.
|
||||||
|
success = ndisks
|
||||||
|
for _, disk := range set.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
// Disk can be offline
|
// Disk can be offline
|
||||||
continue
|
continue
|
||||||
@ -968,6 +959,10 @@ func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker str
|
|||||||
entryChs = append(entryChs, FileInfoCh{
|
entryChs = append(entryChs, FileInfoCh{
|
||||||
Ch: entryCh,
|
Ch: entryCh,
|
||||||
})
|
})
|
||||||
|
success--
|
||||||
|
if success == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return entryChs
|
return entryChs
|
||||||
@ -977,7 +972,8 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
|
|||||||
endWalkCh := make(chan struct{})
|
endWalkCh := make(chan struct{})
|
||||||
defer close(endWalkCh)
|
defer close(endWalkCh)
|
||||||
|
|
||||||
entryChs := s.startMergeWalks(context.Background(), bucket, prefix, "", true, endWalkCh)
|
const ndisks = 3
|
||||||
|
entryChs := s.startMergeWalksN(context.Background(), bucket, prefix, "", true, endWalkCh, ndisks)
|
||||||
|
|
||||||
var objInfos []ObjectInfo
|
var objInfos []ObjectInfo
|
||||||
var eof bool
|
var eof bool
|
||||||
@ -989,18 +985,15 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
|
|||||||
if len(objInfos) == maxKeys {
|
if len(objInfos) == maxKeys {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
result, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
|
result, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
|
||||||
if !ok {
|
if !ok {
|
||||||
eof = true
|
eof = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
rquorum := result.Quorum
|
|
||||||
// Quorum is zero for all directories.
|
if quorumCount < ndisks-1 {
|
||||||
if rquorum == 0 {
|
// Skip entries which are not found on upto ndisks.
|
||||||
// Choose N/2 quorum for directory entries.
|
|
||||||
rquorum = s.drivesPerSet / 2
|
|
||||||
}
|
|
||||||
if quorumCount < rquorum {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1080,7 +1073,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
|
|||||||
// walked and merged at this layer. Resulting value through the merge process sends
|
// walked and merged at this layer. Resulting value through the merge process sends
|
||||||
// the data in lexically sorted order.
|
// the data in lexically sorted order.
|
||||||
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
|
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
|
||||||
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, partialQuorumOnly bool) (loi ListObjectsInfo, err error) {
|
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
|
||||||
if err = checkListObjsArgs(ctx, bucket, prefix, marker, s); err != nil {
|
if err = checkListObjsArgs(ctx, bucket, prefix, marker, s); err != nil {
|
||||||
return loi, err
|
return loi, err
|
||||||
}
|
}
|
||||||
@ -1123,13 +1116,16 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
|
|||||||
recursive = false
|
recursive = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ndisks = 3
|
||||||
|
|
||||||
entryChs, endWalkCh := s.pool.Release(listParams{bucket: bucket, recursive: recursive, marker: marker, prefix: prefix})
|
entryChs, endWalkCh := s.pool.Release(listParams{bucket: bucket, recursive: recursive, marker: marker, prefix: prefix})
|
||||||
if entryChs == nil {
|
if entryChs == nil {
|
||||||
endWalkCh = make(chan struct{})
|
endWalkCh = make(chan struct{})
|
||||||
entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh)
|
// start file tree walk across at most randomly 3 disks in a set.
|
||||||
|
entryChs = s.startMergeWalksN(context.Background(), bucket, prefix, marker, recursive, endWalkCh, ndisks)
|
||||||
}
|
}
|
||||||
|
|
||||||
entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet, partialQuorumOnly)
|
entries := mergeEntriesCh(entryChs, maxKeys, ndisks)
|
||||||
if len(entries.Files) == 0 {
|
if len(entries.Files) == 0 {
|
||||||
return loi, nil
|
return loi, nil
|
||||||
}
|
}
|
||||||
@ -1157,7 +1153,7 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
|
|||||||
// walked and merged at this layer. Resulting value through the merge process sends
|
// walked and merged at this layer. Resulting value through the merge process sends
|
||||||
// the data in lexically sorted order.
|
// the data in lexically sorted order.
|
||||||
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
|
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
|
||||||
return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, false)
|
return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
||||||
|
@ -592,20 +592,16 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
|
|||||||
endWalkCh := make(chan struct{})
|
endWalkCh := make(chan struct{})
|
||||||
defer close(endWalkCh)
|
defer close(endWalkCh)
|
||||||
|
|
||||||
|
const ndisks = 3
|
||||||
for _, zone := range z.zones {
|
for _, zone := range z.zones {
|
||||||
zonesEntryChs = append(zonesEntryChs,
|
zonesEntryChs = append(zonesEntryChs,
|
||||||
zone.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh))
|
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, ndisks))
|
||||||
}
|
}
|
||||||
|
|
||||||
var objInfos []ObjectInfo
|
var objInfos []ObjectInfo
|
||||||
var eof bool
|
var eof bool
|
||||||
var prevPrefix string
|
var prevPrefix string
|
||||||
|
|
||||||
var zoneDrivesPerSet []int
|
|
||||||
for _, zone := range z.zones {
|
|
||||||
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
|
|
||||||
}
|
|
||||||
|
|
||||||
var zonesEntriesInfos [][]FileInfo
|
var zonesEntriesInfos [][]FileInfo
|
||||||
var zonesEntriesValid [][]bool
|
var zonesEntriesValid [][]bool
|
||||||
for _, entryChs := range zonesEntryChs {
|
for _, entryChs := range zonesEntryChs {
|
||||||
@ -617,18 +613,15 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
|
|||||||
if len(objInfos) == maxKeys {
|
if len(objInfos) == maxKeys {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
result, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
|
|
||||||
|
result, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
|
||||||
if !ok {
|
if !ok {
|
||||||
eof = true
|
eof = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
rquorum := result.Quorum
|
|
||||||
// Quorum is zero for all directories.
|
if quorumCount < ndisks-1 {
|
||||||
if rquorum == 0 {
|
// Skip entries which are not found on upto ndisks.
|
||||||
// Choose N/2 quorum for directory entries.
|
|
||||||
rquorum = zoneDrivesPerSet[zoneIndex] / 2
|
|
||||||
}
|
|
||||||
if quorumCount < rquorum {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -704,7 +697,7 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, heal bool) (ListObjectsInfo, error) {
|
func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
||||||
loi := ListObjectsInfo{}
|
loi := ListObjectsInfo{}
|
||||||
|
|
||||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, z); err != nil {
|
if err := checkListObjsArgs(ctx, bucket, prefix, marker, z); err != nil {
|
||||||
@ -752,22 +745,18 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim
|
|||||||
var zonesEntryChs [][]FileInfoCh
|
var zonesEntryChs [][]FileInfoCh
|
||||||
var zonesEndWalkCh []chan struct{}
|
var zonesEndWalkCh []chan struct{}
|
||||||
|
|
||||||
|
const ndisks = 3
|
||||||
for _, zone := range z.zones {
|
for _, zone := range z.zones {
|
||||||
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
|
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
|
||||||
if entryChs == nil {
|
if entryChs == nil {
|
||||||
endWalkCh = make(chan struct{})
|
endWalkCh = make(chan struct{})
|
||||||
entryChs = zone.startMergeWalks(ctx, bucket, prefix, marker, recursive, endWalkCh)
|
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, ndisks)
|
||||||
}
|
}
|
||||||
zonesEntryChs = append(zonesEntryChs, entryChs)
|
zonesEntryChs = append(zonesEntryChs, entryChs)
|
||||||
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
|
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
var zoneDrivesPerSet []int
|
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks)
|
||||||
for _, zone := range z.zones {
|
|
||||||
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
|
|
||||||
}
|
|
||||||
|
|
||||||
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zoneDrivesPerSet, heal)
|
|
||||||
if len(entries.Files) == 0 {
|
if len(entries.Files) == 0 {
|
||||||
return loi, nil
|
return loi, nil
|
||||||
}
|
}
|
||||||
@ -873,7 +862,7 @@ func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneE
|
|||||||
}
|
}
|
||||||
|
|
||||||
// mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys.
|
// mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys.
|
||||||
func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, zoneDrives []int, heal bool) (entries FilesInfo) {
|
func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int) (entries FilesInfo) {
|
||||||
var i = 0
|
var i = 0
|
||||||
var zonesEntriesInfos [][]FileInfo
|
var zonesEntriesInfos [][]FileInfo
|
||||||
var zonesEntriesValid [][]bool
|
var zonesEntriesValid [][]bool
|
||||||
@ -882,32 +871,17 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, zoneDrives [
|
|||||||
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
|
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
fi, quorumCount, zoneIndex, valid := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
|
fi, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
|
||||||
if !valid {
|
if !ok {
|
||||||
// We have reached EOF across all entryChs, break the loop.
|
// We have reached EOF across all entryChs, break the loop.
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
rquorum := fi.Quorum
|
|
||||||
// Quorum is zero for all directories.
|
if quorumCount < ndisks-1 {
|
||||||
if rquorum == 0 {
|
// Skip entries which are not found on upto ndisks.
|
||||||
// Choose N/2 quoroum for directory entries.
|
continue
|
||||||
rquorum = zoneDrives[zoneIndex] / 2
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if heal {
|
|
||||||
// When healing is enabled, we should
|
|
||||||
// list only objects which need healing.
|
|
||||||
if quorumCount == zoneDrives[zoneIndex] {
|
|
||||||
// Skip good entries.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Regular listing, we skip entries not in quorum.
|
|
||||||
if quorumCount < rquorum {
|
|
||||||
// Skip entries which do not have quorum.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
entries.Files = append(entries.Files, fi)
|
entries.Files = append(entries.Files, fi)
|
||||||
i++
|
i++
|
||||||
if i == maxKeys {
|
if i == maxKeys {
|
||||||
@ -953,7 +927,7 @@ func (z *xlZones) ListObjects(ctx context.Context, bucket, prefix, marker, delim
|
|||||||
return z.zones[0].ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
|
return z.zones[0].ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
|
||||||
}
|
}
|
||||||
|
|
||||||
return z.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, false)
|
return z.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *xlZones) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
func (z *xlZones) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user