remove checkBucketExist check entirely to avoid fan-out calls (#18917)

Each Put, List, Multipart operations heavily rely on making
GetBucketInfo() call to verify if bucket exists or not on
a regular basis. This has a large performance cost when there
are tons of servers involved.

We did optimize this part by vectorizing the bucket calls,
however its not enough, beyond 100 nodes and this becomes
fairly visible in terms of performance.
This commit is contained in:
Harshavardhana
2024-01-30 12:43:25 -08:00
committed by GitHub
parent a669946357
commit 80ca120088
38 changed files with 408 additions and 291 deletions

View File

@@ -61,7 +61,7 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
// checkUploadIDExists - verify if a given uploadID exists and is valid.
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string, write bool) (fi FileInfo, metArr []FileInfo, err error) {
defer func() {
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) {
if errors.Is(err, errFileNotFound) {
err = errUploadIDNotFound
}
}()
@@ -71,7 +71,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
storageDisks := er.getDisks()
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, minioMetaMultipartBucket,
uploadIDPath, "", false, false)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
@@ -87,15 +87,14 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
// List all online disks.
_, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, quorum)
var reducedErr error
if write {
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if errors.Is(reducedErr, errErasureWriteQuorum) {
return fi, nil, reducedErr
}
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
} else {
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return fi, nil, reducedErr
}
reducedErr = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
}
if reducedErr != nil {
return fi, nil, reducedErr
}
// Pick one from the first valid metadata.
@@ -200,7 +199,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
if err != nil {
return nil
}
@@ -281,15 +280,14 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
if !disk.IsOnline() {
continue
}
uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
uploadIDs, err = disk.ListDir(ctx, bucket, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
if err != nil {
if errors.Is(err, errDiskNotFound) {
continue
}
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) {
if errors.Is(err, errFileNotFound) {
return result, nil
}
logger.LogIf(ctx, err)
return result, toObjectErr(err, bucket, object)
}
break
@@ -486,8 +484,8 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID)
// Write updated `xl.meta` to all disks.
if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
return nil, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
if _, err := writeUniqueFileInfo(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
return nil, toObjectErr(err, bucket, object)
}
return &NewMultipartUploadResult{
UploadID: uploadID,
@@ -582,6 +580,13 @@ func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
auditObjectErasureSet(ctx, object, &er)
data := r.Reader
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument, logger.Application)
return pi, toObjectErr(errInvalidArgument)
}
// Read lock for upload id.
// Only held while reading the upload metadata.
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
@@ -596,16 +601,12 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Validates if upload ID exists.
fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true)
if err != nil {
if errors.Is(err, errVolumeNotFound) {
return pi, toObjectErr(err, bucket)
}
return pi, toObjectErr(err, bucket, object, uploadID)
}
data := r.Reader
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(rctx, errInvalidArgument, logger.Application)
return pi, toObjectErr(errInvalidArgument)
}
// Write lock for this part ID, only hold it if we are planning to read from the
// streamto avoid any concurrent updates.
//
@@ -681,7 +682,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
}
toEncode := io.Reader(data)
@@ -794,6 +795,9 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
if err != nil {
if errors.Is(err, errVolumeNotFound) {
return result, toObjectErr(err, bucket)
}
return result, toObjectErr(err, bucket, object, uploadID)
}
@@ -979,6 +983,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
fi, partsMetadata, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true)
if err != nil {
if errors.Is(err, errVolumeNotFound) {
return oi, toObjectErr(err, bucket)
}
return oi, toObjectErr(err, bucket, object, uploadID)
}
@@ -1337,6 +1344,9 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
// Validates if upload ID exists.
if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil {
if errors.Is(err, errVolumeNotFound) {
return toObjectErr(err, bucket)
}
return toObjectErr(err, bucket, object, uploadID)
}