feat: Small object optimization read data in single bulk call (#11207)

This commit is contained in:
Harshavardhana
2021-01-03 11:27:57 -08:00
committed by GitHub
parent c9d502e6fa
commit c4131c2798
25 changed files with 439 additions and 495 deletions

View File

@@ -58,7 +58,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
// Read metadata associated with the object from all disks.
storageDisks := er.getDisks()
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID)
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false)
// get Quorum for this object
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
@@ -157,7 +157,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
unlockOnDefer = true
}
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts)
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, true, opts)
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
@@ -298,7 +298,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
}
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber))
readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset,
readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset,
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
// Prefer local disks
@@ -337,7 +337,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
// getObject wrapper for erasure GetObject
func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error {
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts)
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, true, opts)
if err != nil {
return toObjectErr(err, bucket, object)
}
@@ -364,11 +364,11 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return er.getObjectInfo(ctx, bucket, object, opts)
}
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, readData bool, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData)
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil {
@@ -410,7 +410,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts)
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, false, opts)
if err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
@@ -1073,7 +1073,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil {
@@ -1134,7 +1134,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil {