mirror of
https://github.com/minio/minio.git
synced 2025-02-04 02:15:59 -05:00
fix: optimize listMultipartUploads to serve via local disks (#18034)
and remove unused getLoadBalancedDisks()
This commit is contained in:
parent
7b92687397
commit
1647fc7edc
@ -74,62 +74,6 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
|
|||||||
return newDisks
|
return newDisks
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
|
|
||||||
// ensures to skip disks if they are not healing and online.
|
|
||||||
func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
|
|
||||||
disks := er.getDisks()
|
|
||||||
|
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
||||||
if !optimized {
|
|
||||||
var newDisks []StorageAPI
|
|
||||||
for _, i := range r.Perm(len(disks)) {
|
|
||||||
newDisks = append(newDisks, disks[i])
|
|
||||||
}
|
|
||||||
return newDisks
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var mu sync.Mutex
|
|
||||||
newDisks := map[uint64][]StorageAPI{}
|
|
||||||
// Based on the random shuffling return back randomized disks.
|
|
||||||
for _, i := range r.Perm(len(disks)) {
|
|
||||||
i := i
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
if disks[i] == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
di, err := disks[i].DiskInfo(context.Background(), false)
|
|
||||||
if err != nil || di.Healing {
|
|
||||||
// - Do not consume disks which are not reachable
|
|
||||||
// unformatted or simply not accessible for some reason.
|
|
||||||
//
|
|
||||||
// - Do not consume disks which are being healed
|
|
||||||
//
|
|
||||||
// - Future: skip busy disks
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
mu.Lock()
|
|
||||||
// Capture disks usage wise upto resolution of MiB
|
|
||||||
newDisks[di.Used/1024/1024] = append(newDisks[di.Used/1024/1024], disks[i])
|
|
||||||
mu.Unlock()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
var max uint64
|
|
||||||
for k := range newDisks {
|
|
||||||
if k > max {
|
|
||||||
max = k
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return disks which have maximum disk usage common.
|
|
||||||
return newDisks[max]
|
|
||||||
}
|
|
||||||
|
|
||||||
// readMultipleFiles Reads raw data from all specified files from all disks.
|
// readMultipleFiles Reads raw data from all specified files from all disks.
|
||||||
func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultipleReq, readQuorum int) ([]ReadMultipleResp, error) {
|
func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultipleReq, readQuorum int) ([]ReadMultipleResp, error) {
|
||||||
resps := make([]chan ReadMultipleResp, len(disks))
|
resps := make([]chan ReadMultipleResp, len(disks))
|
||||||
|
@ -269,7 +269,13 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
|
|||||||
|
|
||||||
var uploadIDs []string
|
var uploadIDs []string
|
||||||
var disk StorageAPI
|
var disk StorageAPI
|
||||||
for _, disk = range er.getLoadBalancedDisks(true) {
|
disks := er.getLoadBalancedLocalDisks()
|
||||||
|
if len(disks) == 0 {
|
||||||
|
// using er.getLoadBalancedLocalDisks() has one side-affect where
|
||||||
|
// on a pooled setup all disks are remote, add a fallback
|
||||||
|
disks = er.getOnlineDisks()
|
||||||
|
}
|
||||||
|
for _, disk = range disks {
|
||||||
uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
|
uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errDiskNotFound) {
|
if errors.Is(err, errDiskNotFound) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user