mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
make ListMultipart/ListParts more reliable skip healing disks (#18312)
this PR also fixes old flaky tests, by properly marking disk offline-based tests.
This commit is contained in:
parent
483389f2e2
commit
c60f54e5be
@ -62,7 +62,22 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
|
||||
return newDisks
|
||||
}
|
||||
|
||||
func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
|
||||
func (er erasureObjects) getOnlineLocalDisks() (newDisks []StorageAPI) {
|
||||
disks := er.getOnlineDisks()
|
||||
|
||||
// Based on the random shuffling return back randomized disks.
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
for _, i := range r.Perm(len(disks)) {
|
||||
if disks[i] != nil && disks[i].IsLocal() {
|
||||
newDisks = append(newDisks, disks[i])
|
||||
}
|
||||
}
|
||||
|
||||
return newDisks
|
||||
}
|
||||
|
||||
func (er erasureObjects) getLocalDisks() (newDisks []StorageAPI) {
|
||||
disks := er.getDisks()
|
||||
// Based on the random shuffling return back randomized disks.
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
@ -162,7 +162,7 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
|
||||
func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) {
|
||||
// run multiple cleanup's local to this server.
|
||||
var wg sync.WaitGroup
|
||||
for _, disk := range er.getLoadBalancedLocalDisks() {
|
||||
for _, disk := range er.getLocalDisks() {
|
||||
if disk != nil {
|
||||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
@ -268,11 +268,11 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
|
||||
|
||||
var uploadIDs []string
|
||||
var disk StorageAPI
|
||||
disks := er.getLoadBalancedLocalDisks()
|
||||
disks := er.getOnlineLocalDisks()
|
||||
if len(disks) == 0 {
|
||||
// using er.getLoadBalancedLocalDisks() has one side-affect where
|
||||
// using er.getOnlineLocalDisks() has one side-affect where
|
||||
// on a pooled setup all disks are remote, add a fallback
|
||||
disks = er.getDisks()
|
||||
disks = er.getOnlineDisks()
|
||||
}
|
||||
for _, disk = range disks {
|
||||
if disk == nil {
|
||||
@ -867,53 +867,64 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
}
|
||||
|
||||
var disk StorageAPI
|
||||
disks := er.getLoadBalancedLocalDisks()
|
||||
disks := er.getOnlineLocalDisks()
|
||||
if len(disks) == 0 {
|
||||
// using er.getLoadBalancedLocalDisks() has one side-affect where
|
||||
// using er.getOnlineLocalDisks() has one side-affect where
|
||||
// on a pooled setup all disks are remote, add a fallback
|
||||
disks = er.getDisks()
|
||||
disks = er.getOnlineDisks()
|
||||
}
|
||||
|
||||
ch := make(chan ReadMultipleResp) // channel is closed by ReadMultiple()
|
||||
|
||||
for _, disk = range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !disk.IsOnline() {
|
||||
continue
|
||||
}
|
||||
go func(disk StorageAPI) {
|
||||
disk.ReadMultiple(ctx, req, ch) // ignore error since this function only ever returns an error i
|
||||
}(disk)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
var i int
|
||||
for part := range ch {
|
||||
partN := i + partNumberMarker + 1
|
||||
i++
|
||||
g := errgroup.WithNErrs(len(req.Files)).WithConcurrency(32)
|
||||
|
||||
if part.Error != "" || !part.Exists {
|
||||
continue
|
||||
partsInfo := make([]ObjectPartInfo, len(req.Files))
|
||||
for i, file := range req.Files {
|
||||
file := file
|
||||
partN := i + start
|
||||
i := i
|
||||
|
||||
g.Go(func() error {
|
||||
buf, err := disk.ReadAll(ctx, minioMetaMultipartBucket, pathJoin(partPath, file))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var pfi FileInfo
|
||||
_, err = pfi.UnmarshalMsg(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(pfi.Parts) != 1 {
|
||||
return errors.New("invalid number of parts expected 1, got 0")
|
||||
}
|
||||
|
||||
if partN != pfi.Parts[0].Number {
|
||||
return fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partN, partN, pfi.Parts[0].Number)
|
||||
}
|
||||
|
||||
partsInfo[i] = pfi.Parts[0]
|
||||
return nil
|
||||
}, i)
|
||||
}
|
||||
|
||||
g.Wait()
|
||||
|
||||
for _, part := range partsInfo {
|
||||
if part.Number != 0 && !part.ModTime.IsZero() {
|
||||
fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums)
|
||||
}
|
||||
|
||||
var pfi FileInfo
|
||||
_, err := pfi.UnmarshalMsg(part.Data)
|
||||
if err != nil {
|
||||
// Maybe crash or similar.
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
partI := pfi.Parts[0]
|
||||
if partN != partI.Number {
|
||||
logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", i+1, i+1, partI.Number))
|
||||
continue
|
||||
}
|
||||
|
||||
// Add the current part.
|
||||
fi.AddObjectPart(partI.Number, partI.ETag, partI.Size, partI.ActualSize, partI.ModTime, partI.Index, partI.Checksums)
|
||||
}
|
||||
|
||||
// Only parts with higher part numbers will be listed.
|
||||
|
@ -335,7 +335,7 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
|
||||
func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) {
|
||||
// run multiple cleanup's local to this server.
|
||||
var wg sync.WaitGroup
|
||||
for _, disk := range er.getLoadBalancedLocalDisks() {
|
||||
for _, disk := range er.getLocalDisks() {
|
||||
if disk != nil {
|
||||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
@ -1223,8 +1224,18 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks
|
||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
||||
}
|
||||
|
||||
// Remove some random disk.
|
||||
removeDiskN(disks, 1)
|
||||
z := obj.(*erasureServerPools)
|
||||
er := z.serverPools[0].sets[0]
|
||||
|
||||
erasureDisks := er.getDisks()
|
||||
ridx := rand.Intn(len(erasureDisks))
|
||||
|
||||
z.serverPools[0].erasureDisksMu.Lock()
|
||||
er.getDisks = func() []StorageAPI {
|
||||
erasureDisks[ridx] = newNaughtyDisk(erasureDisks[ridx], nil, errFaultyDisk)
|
||||
return erasureDisks
|
||||
}
|
||||
z.serverPools[0].erasureDisksMu.Unlock()
|
||||
|
||||
uploadIDs = append(uploadIDs, res.UploadID)
|
||||
|
||||
@ -1257,9 +1268,6 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks
|
||||
}
|
||||
}
|
||||
|
||||
// Remove one disk.
|
||||
removeDiskN(disks, 1)
|
||||
|
||||
partInfos := []ListPartsInfo{
|
||||
// partinfos - 0.
|
||||
{
|
||||
|
@ -1509,16 +1509,6 @@ func removeRoots(roots []string) {
|
||||
}
|
||||
}
|
||||
|
||||
// removeDiskN - removes N disks from supplied disk slice.
|
||||
func removeDiskN(disks []string, n int) {
|
||||
if n > len(disks) {
|
||||
n = len(disks)
|
||||
}
|
||||
for _, disk := range disks[:n] {
|
||||
os.RemoveAll(disk)
|
||||
}
|
||||
}
|
||||
|
||||
// creates a bucket for the tests and returns the bucket name.
|
||||
// initializes the specified API endpoints for the tests.
|
||||
// initialies the root and returns its path.
|
||||
|
Loading…
Reference in New Issue
Block a user