fix: getPoolIdx decouple from top level options (#11512)

top-level options shouldn't be passed down for
GetObjectInfo() while verifying the objects in
different pools, this is to make sure that
we always get the value from the pool where
the object exists.
This commit is contained in:
Harshavardhana 2021-02-10 11:45:02 -08:00 committed by GitHub
parent 682482459d
commit cbf4bb62e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 25 deletions

View File

@ -714,9 +714,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
}
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, ObjectOptions{
VersionID: objInfo.VersionID,
}, objInfo.Size)
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {

View File

@ -460,6 +460,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin
// Make sure to return object info to provide extra information.
return objInfo, toObjectErr(errMethodNotAllowed, bucket, object)
}
if fi.Deleted {
if opts.VersionID == "" || opts.DeleteMarker {
return objInfo, toObjectErr(errFileNotFound, bucket, object)

View File

@ -246,31 +246,26 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, s
// getPoolIdx returns the found previous object and its corresponding pool idx,
// if none are found falls back to most available space pool.
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, opts ObjectOptions, size int64) (idx int, err error) {
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
if z.SinglePool() {
return 0, nil
}
for i, pool := range z.serverPools {
objInfo, err := pool.GetObjectInfo(ctx, bucket, object, opts)
switch err.(type) {
case VersionNotFound:
// VersionId not found, versionId was specified
case ObjectNotFound:
// VersionId was not specified but found delete marker or no versions exist.
case MethodNotAllowed:
// VersionId was specified but found delete marker
default:
// All other unhandled errors return right here.
if err != nil {
objInfo, err := pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
if err != nil && !isErrObjectNotFound(err) {
return -1, err
}
if isErrObjectNotFound(err) {
// No object exists or its a delete marker,
// check objInfo to confirm.
if objInfo.DeleteMarker && objInfo.Name != "" {
return i, nil
}
// delete marker not specified means no versions
// exist continue to next pool.
if !objInfo.DeleteMarker && err != nil {
// objInfo is not valid, truly the object doesn't
// exist proceed to next pool.
continue
}
// Success case and when DeleteMarker is true return.
// object exists at this pool.
return i, nil
}
@ -604,7 +599,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
return z.serverPools[0].PutObject(ctx, bucket, object, data, opts)
}
idx, err := z.getPoolIdx(ctx, bucket, object, opts, data.Size())
idx, err := z.getPoolIdx(ctx, bucket, object, data.Size())
if err != nil {
return ObjectInfo{}, err
}
@ -625,7 +620,7 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
}
// We don't know the size here set 1GiB atleast.
idx, err := z.getPoolIdx(ctx, bucket, object, opts, 1<<30)
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30)
if err != nil {
return objInfo, err
}
@ -676,7 +671,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, dstOpts, srcInfo.Size)
poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, srcInfo.Size)
if err != nil {
return objInfo, err
}
@ -860,7 +855,7 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
}
// We don't know the exact size, so we ask for at least 1GiB file.
idx, err := z.getPoolIdx(ctx, bucket, object, opts, 1<<30)
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30)
if err != nil {
return "", err
}