diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a76163e7a..233fef93a 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -457,6 +457,35 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin return objInfo, nil } +// getObjectInfoAndQuroum - wrapper for reading object metadata and constructs ObjectInfo, additionally returns write quorum for the object. +func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, wquorum int, err error) { + fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false) + if err != nil { + return objInfo, getWriteQuorum(len(er.getDisks())), toObjectErr(err, bucket, object) + } + + wquorum = fi.Erasure.DataBlocks + if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks { + wquorum++ + } + + objInfo = fi.ToObjectInfo(bucket, object) + if !fi.VersionPurgeStatus.Empty() && opts.VersionID != "" { + // Make sure to return object info to provide extra information. + return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object) + } + + if fi.Deleted { + if opts.VersionID == "" || opts.DeleteMarker { + return objInfo, wquorum, toObjectErr(errFileNotFound, bucket, object) + } + // Make sure to return object info to provide extra information. + return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object) + } + + return objInfo, wquorum, nil +} + func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) { // Undo rename object on disks where RenameFile succeeded. @@ -1224,7 +1253,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string versionFound := true objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response. - goi, gerr := er.getObjectInfo(ctx, bucket, object, opts) + goi, writeQuorum, gerr := er.getObjectInfoAndQuorum(ctx, bucket, object, opts) if gerr != nil && goi.Name == "" { switch gerr.(type) { case InsufficientReadQuorum: @@ -1265,7 +1294,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string defer NSUpdated(bucket, object) storageDisks := er.getDisks() - writeQuorum := len(storageDisks)/2 + 1 + var markDelete bool // Determine whether to mark object deleted for replication if goi.VersionID != "" { @@ -1591,7 +1620,13 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st eventName := event.ObjectTransitionComplete storageDisks := er.getDisks() - writeQuorum := len(storageDisks)/2 + 1 + // we now know the number of blocks this object needs for data and parity. + // writeQuorum is dataBlocks + 1 + writeQuorum := fi.Erasure.DataBlocks + if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks { + writeQuorum++ + } + if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil { eventName = event.ObjectTransitionFailed }