Avoid unnecessary statPart() calls in PutObjectPart (#8905)

Assume `xl.json` as the source of truth for all operations.
This commit is contained in:
Harshavardhana 2020-02-04 10:04:37 +05:30 committed by GitHub
parent 278a165674
commit e9c111c8d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 4 additions and 43 deletions

View File

@ -40,7 +40,7 @@ const erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde"
type ObjectPartInfo struct {
Number int `json:"number"`
Name string `json:"name"`
ETag string `json:"etag"`
ETag string `json:"etag,omitempty"`
Size int64 `json:"size"`
ActualSize int64 `json:"actualSize"`
}

View File

@ -76,34 +76,6 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string)
g.Wait()
}
// statPart - returns fileInfo structure for a successful stat on part file.
func (xl xlObjects) statPart(ctx context.Context, bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) {
var ignoredErrs []error
partNamePath := path.Join(xl.getUploadIDDir(bucket, object, uploadID), partName)
for _, disk := range xl.getLoadBalancedDisks() {
if disk == nil {
ignoredErrs = append(ignoredErrs, errDiskNotFound)
continue
}
fileInfo, err = disk.StatFile(minioMetaMultipartBucket, partNamePath)
if err == nil {
return fileInfo, nil
}
// For any reason disk was deleted or goes offline we continue to next disk.
if IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
ignoredErrs = append(ignoredErrs, err)
continue
}
// Error is not ignored, return right here.
logger.LogIf(ctx, err)
return FileInfo{}, err
}
// If all errors were ignored, reduce to maximal occurrence
// based on the read quorum.
readQuorum := len(xl.getDisks()) / 2
return FileInfo{}, reduceReadQuorumErrs(ctx, ignoredErrs, nil, readQuorum)
}
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.
func commitXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) ([]StorageAPI, error) {
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
@ -446,17 +418,12 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
fi, err := xl.statPart(ctx, bucket, object, uploadID, partSuffix)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partSuffix)
}
// Return success.
return PartInfo{
PartNumber: partID,
LastModified: fi.ModTime,
LastModified: xlMeta.Stat.ModTime,
ETag: md5hex,
Size: fi.Size,
Size: xlMeta.Stat.Size,
ActualSize: data.ActualSize(),
}, nil
}
@ -531,15 +498,10 @@ func (xl xlObjects) ListObjectParts(ctx context.Context, bucket, object, uploadI
}
count := maxParts
for _, part := range parts {
var fi FileInfo
fi, err = xl.statPart(ctx, bucket, object, uploadID, part.Name)
if err != nil {
return result, toObjectErr(err, minioMetaBucket, path.Join(uploadID, part.Name))
}
result.Parts = append(result.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime,
LastModified: xlValidMeta.Stat.ModTime,
Size: part.Size,
})
count--
@ -667,7 +629,6 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Add incoming parts.
xlMeta.Parts[i] = ObjectPartInfo{
Number: part.PartNumber,
ETag: part.ETag,
Size: currentXLMeta.Parts[partIdx].Size,
Name: fmt.Sprintf("part.%d", part.PartNumber),
ActualSize: currentXLMeta.Parts[partIdx].ActualSize,