From c60f54e5be7302d82d0d8fc404c056fea4e2bf4e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 24 Oct 2023 23:33:25 -0700 Subject: [PATCH] make ListMultipart/ListParts more reliable skip healing disks (#18312) this PR also fixes old flaky tests, by properly marking disk offline-based tests. --- cmd/erasure-common.go | 17 ++++++- cmd/erasure-multipart.go | 81 ++++++++++++++++++-------------- cmd/erasure.go | 2 +- cmd/object-api-multipart_test.go | 18 +++++-- cmd/test-utils_test.go | 10 ---- 5 files changed, 76 insertions(+), 52 deletions(-) diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 9afb85a94..796183584 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -62,7 +62,22 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { return newDisks } -func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { +func (er erasureObjects) getOnlineLocalDisks() (newDisks []StorageAPI) { + disks := er.getOnlineDisks() + + // Based on the random shuffling return back randomized disks. + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for _, i := range r.Perm(len(disks)) { + if disks[i] != nil && disks[i].IsLocal() { + newDisks = append(newDisks, disks[i]) + } + } + + return newDisks +} + +func (er erasureObjects) getLocalDisks() (newDisks []StorageAPI) { disks := er.getDisks() // Based on the random shuffling return back randomized disks. r := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 2dd7da21e..885636579 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -162,7 +162,7 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) { // run multiple cleanup's local to this server. var wg sync.WaitGroup - for _, disk := range er.getLoadBalancedLocalDisks() { + for _, disk := range er.getLocalDisks() { if disk != nil { wg.Add(1) go func(disk StorageAPI) { @@ -268,11 +268,11 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec var uploadIDs []string var disk StorageAPI - disks := er.getLoadBalancedLocalDisks() + disks := er.getOnlineLocalDisks() if len(disks) == 0 { - // using er.getLoadBalancedLocalDisks() has one side-affect where + // using er.getOnlineLocalDisks() has one side-affect where // on a pooled setup all disks are remote, add a fallback - disks = er.getDisks() + disks = er.getOnlineDisks() } for _, disk = range disks { if disk == nil { @@ -867,53 +867,64 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up } var disk StorageAPI - disks := er.getLoadBalancedLocalDisks() + disks := er.getOnlineLocalDisks() if len(disks) == 0 { - // using er.getLoadBalancedLocalDisks() has one side-affect where + // using er.getOnlineLocalDisks() has one side-affect where // on a pooled setup all disks are remote, add a fallback - disks = er.getDisks() + disks = er.getOnlineDisks() } - ch := make(chan ReadMultipleResp) // channel is closed by ReadMultiple() - for _, disk = range disks { if disk == nil { continue } + if !disk.IsOnline() { continue } - go func(disk StorageAPI) { - disk.ReadMultiple(ctx, req, ch) // ignore error since this function only ever returns an error i - }(disk) + break } - var i int - for part := range ch { - partN := i + partNumberMarker + 1 - i++ + g := errgroup.WithNErrs(len(req.Files)).WithConcurrency(32) - if part.Error != "" || !part.Exists { - continue + partsInfo := make([]ObjectPartInfo, len(req.Files)) + for i, file := range req.Files { + file := file + partN := i + start + i := i + + g.Go(func() error { + buf, err := disk.ReadAll(ctx, minioMetaMultipartBucket, pathJoin(partPath, file)) + if err != nil { + return err + } + + var pfi FileInfo + _, err = pfi.UnmarshalMsg(buf) + if err != nil { + return err + } + + if len(pfi.Parts) != 1 { + return errors.New("invalid number of parts expected 1, got 0") + } + + if partN != pfi.Parts[0].Number { + return fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partN, partN, pfi.Parts[0].Number) + } + + partsInfo[i] = pfi.Parts[0] + return nil + }, i) + } + + g.Wait() + + for _, part := range partsInfo { + if part.Number != 0 && !part.ModTime.IsZero() { + fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums) } - - var pfi FileInfo - _, err := pfi.UnmarshalMsg(part.Data) - if err != nil { - // Maybe crash or similar. - logger.LogIf(ctx, err) - continue - } - - partI := pfi.Parts[0] - if partN != partI.Number { - logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", i+1, i+1, partI.Number)) - continue - } - - // Add the current part. - fi.AddObjectPart(partI.Number, partI.ETag, partI.Size, partI.ActualSize, partI.ModTime, partI.Index, partI.Checksums) } // Only parts with higher part numbers will be listed. diff --git a/cmd/erasure.go b/cmd/erasure.go index 34103acb5..84d7c8b10 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -335,7 +335,7 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) { // run multiple cleanup's local to this server. var wg sync.WaitGroup - for _, disk := range er.getLoadBalancedLocalDisks() { + for _, disk := range er.getLocalDisks() { if disk != nil { wg.Add(1) go func(disk StorageAPI) { diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index c79436eb1..c89032298 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -21,6 +21,7 @@ import ( "bytes" "context" "fmt" + "math/rand" "reflect" "runtime" "strings" @@ -1223,8 +1224,18 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks t.Fatalf("%s : %s", instanceType, err.Error()) } - // Remove some random disk. - removeDiskN(disks, 1) + z := obj.(*erasureServerPools) + er := z.serverPools[0].sets[0] + + erasureDisks := er.getDisks() + ridx := rand.Intn(len(erasureDisks)) + + z.serverPools[0].erasureDisksMu.Lock() + er.getDisks = func() []StorageAPI { + erasureDisks[ridx] = newNaughtyDisk(erasureDisks[ridx], nil, errFaultyDisk) + return erasureDisks + } + z.serverPools[0].erasureDisksMu.Unlock() uploadIDs = append(uploadIDs, res.UploadID) @@ -1257,9 +1268,6 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks } } - // Remove one disk. - removeDiskN(disks, 1) - partInfos := []ListPartsInfo{ // partinfos - 0. { diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 6c8072d0b..3a9a41df4 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -1509,16 +1509,6 @@ func removeRoots(roots []string) { } } -// removeDiskN - removes N disks from supplied disk slice. -func removeDiskN(disks []string, n int) { - if n > len(disks) { - n = len(disks) - } - for _, disk := range disks[:n] { - os.RemoveAll(disk) - } -} - // creates a bucket for the tests and returns the bucket name. // initializes the specified API endpoints for the tests. // initialies the root and returns its path.