mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
optimize double reads by reusing results from checkUploadIDExists() (#15692)
Move to using `xl.meta` data structure to keep temporary partInfo, this allows for a future change where we move to different parts to different drives.
This commit is contained in:
parent
124544d834
commit
9e5853ecc0
@ -48,33 +48,50 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
|
||||
}
|
||||
|
||||
// checkUploadIDExists - verify if a given uploadID exists and is valid.
|
||||
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
|
||||
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string, write bool) (fi FileInfo, metArr []FileInfo, err error) {
|
||||
defer func() {
|
||||
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) {
|
||||
err = errUploadIDNotFound
|
||||
}
|
||||
}()
|
||||
|
||||
disks := er.getDisks()
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false)
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
|
||||
uploadIDPath, "", false)
|
||||
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||
return reducedErr
|
||||
return fi, nil, err
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
_, modTime := listOnlineDisks(disks, metaArr, errs)
|
||||
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick latest valid metadata.
|
||||
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
||||
return err
|
||||
var quorum int
|
||||
if write {
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return fi, nil, reducedErr
|
||||
}
|
||||
|
||||
quorum = writeQuorum
|
||||
} else {
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||
return fi, nil, reducedErr
|
||||
}
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
quorum = readQuorum
|
||||
}
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, quorum)
|
||||
|
||||
return fi, partsMetadata, err
|
||||
}
|
||||
|
||||
// Removes part.meta given by partName belonging to a mulitpart upload from minioMetaBucket
|
||||
@ -581,40 +598,16 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
return pi, toObjectErr(errInvalidArgument)
|
||||
}
|
||||
|
||||
var partsMetadata []FileInfo
|
||||
var errs []error
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
// Validates if upload ID exists.
|
||||
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
||||
fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
|
||||
uploadIDPath, "", false)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return pi, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, writeQuorum)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err)
|
||||
}
|
||||
onlineDisks := er.getDisks()
|
||||
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
||||
|
||||
if cs := fi.Metadata[hash.MinIOMultipartChecksum]; cs != "" {
|
||||
if r.ContentCRCType().String() != cs {
|
||||
@ -728,7 +721,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
index = opts.IndexCB()
|
||||
}
|
||||
|
||||
part := ObjectPartInfo{
|
||||
partInfo := ObjectPartInfo{
|
||||
Number: partID,
|
||||
ETag: md5hex,
|
||||
Size: n,
|
||||
@ -737,29 +730,29 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
Index: index,
|
||||
Checksums: r.ContentCRC(),
|
||||
}
|
||||
|
||||
partMsg, err := part.MarshalMsg(nil)
|
||||
fi.Parts = []ObjectPartInfo{partInfo}
|
||||
partFI, err := fi.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||
}
|
||||
|
||||
// Write part metadata to all disks.
|
||||
onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", partMsg, writeQuorum)
|
||||
onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", partFI, writeQuorum)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||
}
|
||||
|
||||
// Return success.
|
||||
return PartInfo{
|
||||
PartNumber: part.Number,
|
||||
ETag: part.ETag,
|
||||
LastModified: part.ModTime,
|
||||
Size: part.Size,
|
||||
ActualSize: part.ActualSize,
|
||||
ChecksumCRC32: part.Checksums["CRC32"],
|
||||
ChecksumCRC32C: part.Checksums["CRC32C"],
|
||||
ChecksumSHA1: part.Checksums["SHA1"],
|
||||
ChecksumSHA256: part.Checksums["SHA256"],
|
||||
PartNumber: partInfo.Number,
|
||||
ETag: partInfo.ETag,
|
||||
LastModified: partInfo.ModTime,
|
||||
Size: partInfo.Size,
|
||||
ActualSize: partInfo.ActualSize,
|
||||
ChecksumCRC32: partInfo.Checksums["CRC32"],
|
||||
ChecksumCRC32C: partInfo.Checksums["CRC32C"],
|
||||
ChecksumSHA1: partInfo.Checksums["SHA1"],
|
||||
ChecksumSHA256: partInfo.Checksums["SHA256"],
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -785,36 +778,11 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
ctx = lkctx.Context()
|
||||
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
||||
|
||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false)
|
||||
|
||||
// get Quorum for this object
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
|
||||
if reducedErr == errErasureReadQuorum {
|
||||
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.UserDefined = cloneMSS(fi.Metadata)
|
||||
return result, nil
|
||||
}
|
||||
@ -837,36 +805,14 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
ctx = lkctx.Context()
|
||||
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
||||
|
||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
onlineDisks := er.getDisks()
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
if maxParts == 0 {
|
||||
return result, nil
|
||||
}
|
||||
@ -897,6 +843,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i))
|
||||
}
|
||||
|
||||
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
||||
|
||||
partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum)
|
||||
if err != nil {
|
||||
return result, err
|
||||
@ -916,20 +864,21 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var partI ObjectPartInfo
|
||||
for i, part := range partInfoFiles {
|
||||
partN := i + partNumberMarker + 1
|
||||
if part.Error != "" || !part.Exists {
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := partI.UnmarshalMsg(part.Data)
|
||||
var pfi FileInfo
|
||||
_, err := pfi.UnmarshalMsg(part.Data)
|
||||
if err != nil {
|
||||
// Maybe crash or similar.
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
partI := pfi.Parts[0]
|
||||
if partN != partI.Number {
|
||||
logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", i+1, i+1, partI.Number))
|
||||
continue
|
||||
@ -989,35 +938,14 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
wctx := wlkctx.Context()
|
||||
defer uploadIDLock.Unlock(wlkctx.Cancel)
|
||||
|
||||
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
||||
fi, partsMetadata, err := er.checkUploadIDExists(wctx, bucket, object, uploadID, true)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllFileInfo(wctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(wctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return oi, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
||||
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
onlineDisks := er.getDisks()
|
||||
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
||||
|
||||
// Read Part info for all parts
|
||||
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
|
||||
@ -1064,8 +992,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
}
|
||||
|
||||
var partI ObjectPartInfo
|
||||
_, err := partI.UnmarshalMsg(part.Data)
|
||||
var pfi FileInfo
|
||||
_, err := pfi.UnmarshalMsg(part.Data)
|
||||
if err != nil {
|
||||
// Maybe crash or similar.
|
||||
logger.LogIf(ctx, err)
|
||||
@ -1074,7 +1002,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
}
|
||||
|
||||
if partID != partI.Number {
|
||||
partI := pfi.Parts[0]
|
||||
partNumber := partI.Number
|
||||
if partID != partNumber {
|
||||
logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partID, partID, partI.Number))
|
||||
return oi, InvalidPart{
|
||||
PartNumber: partID,
|
||||
@ -1307,7 +1237,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
|
||||
// Validates if upload ID exists.
|
||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil {
|
||||
return toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
|
@ -683,10 +683,7 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj
|
||||
return objInfo, er.defaultWQuorum(), toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
wquorum = fi.Erasure.DataBlocks
|
||||
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
|
||||
wquorum++
|
||||
}
|
||||
wquorum = fi.WriteQuorum(er.defaultWQuorum())
|
||||
|
||||
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
||||
if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" {
|
||||
|
@ -795,10 +795,7 @@ func (es *erasureSingle) getObjectInfoAndQuorum(ctx context.Context, bucket, obj
|
||||
return objInfo, 1, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
wquorum = fi.Erasure.DataBlocks
|
||||
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
|
||||
wquorum++
|
||||
}
|
||||
wquorum = fi.WriteQuorum(1)
|
||||
|
||||
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
||||
if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" {
|
||||
|
@ -234,6 +234,26 @@ type FileInfo struct {
|
||||
Checksum []byte `msg:"cs,allownil"`
|
||||
}
|
||||
|
||||
// WriteQuorum returns expected write quorum for this FileInfo
|
||||
func (fi FileInfo) WriteQuorum(dquorum int) int {
|
||||
if fi.Deleted {
|
||||
return dquorum
|
||||
}
|
||||
quorum := fi.Erasure.DataBlocks
|
||||
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
|
||||
quorum++
|
||||
}
|
||||
return quorum
|
||||
}
|
||||
|
||||
// ReadQuorum returns expected read quorum for this FileInfo
|
||||
func (fi FileInfo) ReadQuorum(dquorum int) int {
|
||||
if fi.Deleted {
|
||||
return dquorum
|
||||
}
|
||||
return fi.Erasure.DataBlocks
|
||||
}
|
||||
|
||||
// Equals checks if fi(FileInfo) matches ofi(FileInfo)
|
||||
func (fi FileInfo) Equals(ofi FileInfo) (ok bool) {
|
||||
if !fi.MetadataEquals(ofi) {
|
||||
|
Loading…
Reference in New Issue
Block a user