diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 3e6ce6e0f..24b5eb7cf 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -39,6 +39,7 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/pkg/mimedb" "github.com/minio/pkg/sync/errgroup" + uatomic "go.uber.org/atomic" ) func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string { @@ -371,29 +372,61 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, userDefined["etag"] = opts.PreserveETag } onlineDisks := er.getDisks() + + // Get parity and data drive count based on storage class metadata parityDrives := globalStorageClass.GetParityForSC(userDefined[xhttp.AmzStorageClass]) if parityDrives < 0 { parityDrives = er.defaultParityCount } + // If we have offline disks upgrade the number of erasure codes for this object. parityOrig := parityDrives + + atomicParityDrives := uatomic.NewInt64(0) + atomicOfflineDrives := uatomic.NewInt64(0) + + // Start with current parityDrives + atomicParityDrives.Store(int64(parityDrives)) + + var wg sync.WaitGroup for _, disk := range onlineDisks { - if parityDrives >= len(onlineDisks)/2 { - parityDrives = len(onlineDisks) / 2 - break - } if disk == nil { - parityDrives++ + atomicParityDrives.Inc() + atomicOfflineDrives.Inc() continue } - di, err := disk.DiskInfo(ctx) - if err != nil || di.ID == "" { - parityDrives++ + if !disk.IsOnline() { + atomicParityDrives.Inc() + atomicOfflineDrives.Inc() + continue } + wg.Add(1) + go func(disk StorageAPI) { + defer wg.Done() + di, err := disk.DiskInfo(ctx) + if err != nil || di.ID == "" { + atomicOfflineDrives.Inc() + atomicParityDrives.Inc() + } + }(disk) + } + wg.Wait() + + if int(atomicOfflineDrives.Load()) > len(onlineDisks)/2 { + // if offline drives are more than 50% of the drives + // we have no quorum, we shouldn't proceed just + // fail at that point. + return nil, toObjectErr(errErasureWriteQuorum, bucket, object) + } + + parityDrives = int(atomicParityDrives.Load()) + if parityDrives >= len(onlineDisks)/2 { + parityDrives = len(onlineDisks) / 2 } if parityOrig != parityDrives { userDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives) } + dataDrives := len(onlineDisks) - parityDrives // we now know the number of blocks this object needs for data and parity. diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index ae5648c59..78925a367 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1001,6 +1001,8 @@ func healObjectVersionsDisparity(bucket string, entry metaCacheEntry) error { func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { auditObjectErasureSet(ctx, object, &er) + data := r.Reader + if opts.CheckPrecondFn != nil { obj, err := er.getObjectInfo(ctx, bucket, object, opts) if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) { @@ -1011,7 +1013,11 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } } - data := r.Reader + // Validate input data size and it can never be less than -1. + if data.Size() < -1 { + logger.LogIf(ctx, errInvalidArgument, logger.Application) + return ObjectInfo{}, toObjectErr(errInvalidArgument) + } userDefined := cloneMSS(opts.UserDefined) @@ -1029,6 +1035,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st parityOrig := parityDrives atomicParityDrives := uatomic.NewInt64(0) + atomicOfflineDrives := uatomic.NewInt64(0) + // Start with current parityDrives atomicParityDrives.Store(int64(parityDrives)) @@ -1036,10 +1044,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st for _, disk := range storageDisks { if disk == nil { atomicParityDrives.Inc() + atomicOfflineDrives.Inc() continue } if !disk.IsOnline() { atomicParityDrives.Inc() + atomicOfflineDrives.Inc() continue } wg.Add(1) @@ -1047,12 +1057,20 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st defer wg.Done() di, err := disk.DiskInfo(ctx) if err != nil || di.ID == "" { + atomicOfflineDrives.Inc() atomicParityDrives.Inc() } }(disk) } wg.Wait() + if int(atomicOfflineDrives.Load()) >= (len(storageDisks)+1)/2 { + // if offline drives are more than 50% of the drives + // we have no quorum, we shouldn't proceed just + // fail at that point. + return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object) + } + parityDrives = int(atomicParityDrives.Load()) if parityDrives >= len(storageDisks)/2 { parityDrives = len(storageDisks) / 2 @@ -1070,12 +1088,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st writeQuorum++ } - // Validate input data size and it can never be less than zero. - if data.Size() < -1 { - logger.LogIf(ctx, errInvalidArgument, logger.Application) - return ObjectInfo{}, toObjectErr(errInvalidArgument) - } - // Initialize parts metadata partsMetadata := make([]FileInfo, len(storageDisks))