diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index d146f268b..f537ca307 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -48,33 +48,50 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string { } // checkUploadIDExists - verify if a given uploadID exists and is valid. -func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) { +func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string, write bool) (fi FileInfo, metArr []FileInfo, err error) { defer func() { if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) { err = errUploadIDNotFound } }() - disks := er.getDisks() + uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) + + storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false) + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, + uploadIDPath, "", false) - readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) + readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) if err != nil { - return err - } - - if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { - return reducedErr + return fi, nil, err } // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) - // Pick latest valid metadata. - _, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) - return err + var quorum int + if write { + reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + if reducedErr == errErasureWriteQuorum { + return fi, nil, reducedErr + } + + quorum = writeQuorum + } else { + if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { + return fi, nil, reducedErr + } + + // Pick one from the first valid metadata. + quorum = readQuorum + } + + // Pick one from the first valid metadata. + fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, quorum) + + return fi, partsMetadata, err } // Removes part.meta given by partName belonging to a mulitpart upload from minioMetaBucket @@ -581,40 +598,16 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return pi, toObjectErr(errInvalidArgument) } - var partsMetadata []FileInfo - var errs []error uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Validates if upload ID exists. - if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { + fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true) + if err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } - storageDisks := er.getDisks() - - // Read metadata associated with the object from all disks. - partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, - uploadIDPath, "", false) - - // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount) - if err != nil { - return pi, toObjectErr(err, bucket, object) - } - - reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum) - if reducedErr == errErasureWriteQuorum { - return pi, toObjectErr(reducedErr, bucket, object) - } - - // List all online disks. - onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) - - // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, writeQuorum) - if err != nil { - return pi, toObjectErr(err) - } + onlineDisks := er.getDisks() + writeQuorum := fi.WriteQuorum(er.defaultWQuorum()) if cs := fi.Metadata[hash.MinIOMultipartChecksum]; cs != "" { if r.ContentCRCType().String() != cs { @@ -728,7 +721,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo index = opts.IndexCB() } - part := ObjectPartInfo{ + partInfo := ObjectPartInfo{ Number: partID, ETag: md5hex, Size: n, @@ -737,29 +730,29 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo Index: index, Checksums: r.ContentCRC(), } - - partMsg, err := part.MarshalMsg(nil) + fi.Parts = []ObjectPartInfo{partInfo} + partFI, err := fi.MarshalMsg(nil) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Write part metadata to all disks. - onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", partMsg, writeQuorum) + onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", partFI, writeQuorum) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Return success. return PartInfo{ - PartNumber: part.Number, - ETag: part.ETag, - LastModified: part.ModTime, - Size: part.Size, - ActualSize: part.ActualSize, - ChecksumCRC32: part.Checksums["CRC32"], - ChecksumCRC32C: part.Checksums["CRC32C"], - ChecksumSHA1: part.Checksums["SHA1"], - ChecksumSHA256: part.Checksums["SHA256"], + PartNumber: partInfo.Number, + ETag: partInfo.ETag, + LastModified: partInfo.ModTime, + Size: partInfo.Size, + ActualSize: partInfo.ActualSize, + ChecksumCRC32: partInfo.Checksums["CRC32"], + ChecksumCRC32C: partInfo.Checksums["CRC32C"], + ChecksumSHA1: partInfo.Checksums["SHA1"], + ChecksumSHA256: partInfo.Checksums["SHA256"], }, nil } @@ -785,36 +778,11 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u ctx = lkctx.Context() defer uploadIDLock.RUnlock(lkctx.Cancel) - if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) + if err != nil { return result, toObjectErr(err, bucket, object, uploadID) } - uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - - storageDisks := er.getDisks() - - // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false) - - // get Quorum for this object - readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) - if err != nil { - return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) - } - - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) - if reducedErr == errErasureReadQuorum { - return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) - } - - _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) - - // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum) - if err != nil { - return result, err - } - result.UserDefined = cloneMSS(fi.Metadata) return result, nil } @@ -837,36 +805,14 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up ctx = lkctx.Context() defer uploadIDLock.RUnlock(lkctx.Cancel) - if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) + if err != nil { return result, toObjectErr(err, bucket, object, uploadID) } + onlineDisks := er.getDisks() uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - storageDisks := er.getDisks() - - // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - - // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) - if err != nil { - return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) - } - - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) - if reducedErr == errErasureWriteQuorum { - return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) - } - - onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) - - // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) - if err != nil { - return result, err - } - if maxParts == 0 { return result, nil } @@ -897,6 +843,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i)) } + writeQuorum := fi.WriteQuorum(er.defaultWQuorum()) + partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum) if err != nil { return result, err @@ -916,20 +864,21 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, nil } - var partI ObjectPartInfo for i, part := range partInfoFiles { partN := i + partNumberMarker + 1 if part.Error != "" || !part.Exists { continue } - _, err := partI.UnmarshalMsg(part.Data) + var pfi FileInfo + _, err := pfi.UnmarshalMsg(part.Data) if err != nil { // Maybe crash or similar. logger.LogIf(ctx, err) continue } + partI := pfi.Parts[0] if partN != partI.Number { logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", i+1, i+1, partI.Number)) continue @@ -989,35 +938,14 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str wctx := wlkctx.Context() defer uploadIDLock.Unlock(wlkctx.Cancel) - if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { + fi, partsMetadata, err := er.checkUploadIDExists(wctx, bucket, object, uploadID, true) + if err != nil { return oi, toObjectErr(err, bucket, object, uploadID) } uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - - storageDisks := er.getDisks() - - // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(wctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - - // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(wctx, partsMetadata, errs, er.defaultParityCount) - if err != nil { - return oi, toObjectErr(err, bucket, object) - } - - reducedErr := reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) - if reducedErr == errErasureWriteQuorum { - return oi, toObjectErr(reducedErr, bucket, object) - } - - onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) - - // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum) - if err != nil { - return oi, err - } + onlineDisks := er.getDisks() + writeQuorum := fi.WriteQuorum(er.defaultWQuorum()) // Read Part info for all parts partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" @@ -1064,8 +992,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } - var partI ObjectPartInfo - _, err := partI.UnmarshalMsg(part.Data) + var pfi FileInfo + _, err := pfi.UnmarshalMsg(part.Data) if err != nil { // Maybe crash or similar. logger.LogIf(ctx, err) @@ -1074,7 +1002,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } - if partID != partI.Number { + partI := pfi.Parts[0] + partNumber := partI.Number + if partID != partNumber { logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partID, partID, partI.Number)) return oi, InvalidPart{ PartNumber: partID, @@ -1307,7 +1237,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec defer lk.Unlock(lkctx.Cancel) // Validates if upload ID exists. - if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil { return toObjectErr(err, bucket, object, uploadID) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 07b2b3993..ac70b1750 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -683,10 +683,7 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj return objInfo, er.defaultWQuorum(), toObjectErr(err, bucket, object) } - wquorum = fi.Erasure.DataBlocks - if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks { - wquorum++ - } + wquorum = fi.WriteQuorum(er.defaultWQuorum()) objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { diff --git a/cmd/erasure-single-drive.go b/cmd/erasure-single-drive.go index 370f69b20..c0e5b3951 100644 --- a/cmd/erasure-single-drive.go +++ b/cmd/erasure-single-drive.go @@ -795,10 +795,7 @@ func (es *erasureSingle) getObjectInfoAndQuorum(ctx context.Context, bucket, obj return objInfo, 1, toObjectErr(err, bucket, object) } - wquorum = fi.Erasure.DataBlocks - if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks { - wquorum++ - } + wquorum = fi.WriteQuorum(1) objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index fe3a7b8ce..9e6190684 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -234,6 +234,26 @@ type FileInfo struct { Checksum []byte `msg:"cs,allownil"` } +// WriteQuorum returns expected write quorum for this FileInfo +func (fi FileInfo) WriteQuorum(dquorum int) int { + if fi.Deleted { + return dquorum + } + quorum := fi.Erasure.DataBlocks + if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks { + quorum++ + } + return quorum +} + +// ReadQuorum returns expected read quorum for this FileInfo +func (fi FileInfo) ReadQuorum(dquorum int) int { + if fi.Deleted { + return dquorum + } + return fi.Erasure.DataBlocks +} + // Equals checks if fi(FileInfo) matches ofi(FileInfo) func (fi FileInfo) Equals(ofi FileInfo) (ok bool) { if !fi.MetadataEquals(ofi) {