diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 75d79a028..9db82f8ff 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -397,16 +397,27 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec // // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { - uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + // Write lock for this part ID. + // Held throughout the operation. + partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) + plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) + if err != nil { + return PartInfo{}, err + } + pctx := plkctx.Context() + defer partIDLock.Unlock(plkctx.Cancel) + + // Read lock for upload id. + // Only held while reading the upload metadata. + uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } rctx := rlkctx.Context() - readLocked := true defer func() { - if readLocked { - uploadIDLock.RUnlock(rlkctx.Cancel) + if uploadIDRLock != nil { + uploadIDRLock.RUnlock(rlkctx.Cancel) } }() @@ -432,13 +443,17 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) + // Unlock upload id locks before, so others can get it. + uploadIDRLock.RUnlock(rlkctx.Cancel) + uploadIDRLock = nil + // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount) + _, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount) if err != nil { return pi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } @@ -447,7 +462,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum) + fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -469,7 +484,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } }() - erasure, err := NewErasure(rctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) + erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -505,7 +520,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) } - n, err := erasure.Encode(rctx, data, writers, buffer, writeQuorum) + n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum) closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) @@ -523,16 +538,14 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } } - // Unlock here before acquiring write locks all concurrent - // PutObjectParts would serialize here updating `xl.meta` - uploadIDLock.RUnlock(rlkctx.Cancel) - readLocked = false - wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) + // Acquire write lock to update metadata. + uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) + wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } wctx := wlkctx.Context() - defer uploadIDLock.Unlock(wlkctx.Cancel) + defer uploadIDWLock.Unlock(wlkctx.Cancel) // Validates if upload ID exists. if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {