fix: invalid quorum calculation in TransitionObject (#13125)

Quorum calculation should be based on the
existing metadata, custom quorum calculation
can lead to unreadable content.
This commit is contained in:
Harshavardhana 2021-09-01 08:57:42 -07:00 committed by GitHub
parent f89d0f68d0
commit 03b7bebc96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -457,6 +457,35 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin
return objInfo, nil return objInfo, nil
} }
// getObjectInfoAndQuroum - wrapper for reading object metadata and constructs ObjectInfo, additionally returns write quorum for the object.
func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, wquorum int, err error) {
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false)
if err != nil {
return objInfo, getWriteQuorum(len(er.getDisks())), toObjectErr(err, bucket, object)
}
wquorum = fi.Erasure.DataBlocks
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
wquorum++
}
objInfo = fi.ToObjectInfo(bucket, object)
if !fi.VersionPurgeStatus.Empty() && opts.VersionID != "" {
// Make sure to return object info to provide extra information.
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
}
if fi.Deleted {
if opts.VersionID == "" || opts.DeleteMarker {
return objInfo, wquorum, toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
}
return objInfo, wquorum, nil
}
func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) { func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) {
// Undo rename object on disks where RenameFile succeeded. // Undo rename object on disks where RenameFile succeeded.
@ -1224,7 +1253,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
versionFound := true versionFound := true
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response. objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
goi, gerr := er.getObjectInfo(ctx, bucket, object, opts) goi, writeQuorum, gerr := er.getObjectInfoAndQuorum(ctx, bucket, object, opts)
if gerr != nil && goi.Name == "" { if gerr != nil && goi.Name == "" {
switch gerr.(type) { switch gerr.(type) {
case InsufficientReadQuorum: case InsufficientReadQuorum:
@ -1265,7 +1294,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
defer NSUpdated(bucket, object) defer NSUpdated(bucket, object)
storageDisks := er.getDisks() storageDisks := er.getDisks()
writeQuorum := len(storageDisks)/2 + 1
var markDelete bool var markDelete bool
// Determine whether to mark object deleted for replication // Determine whether to mark object deleted for replication
if goi.VersionID != "" { if goi.VersionID != "" {
@ -1591,7 +1620,13 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
eventName := event.ObjectTransitionComplete eventName := event.ObjectTransitionComplete
storageDisks := er.getDisks() storageDisks := er.getDisks()
writeQuorum := len(storageDisks)/2 + 1 // we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := fi.Erasure.DataBlocks
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
writeQuorum++
}
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil { if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil {
eventName = event.ObjectTransitionFailed eventName = event.ObjectTransitionFailed
} }