diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 1c79671dd..1ed9b32ae 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -244,6 +244,50 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, s return serverPools } +// getPoolIdxExisting returns the (first) found object pool index containing an object. +// If the object exists, but the latest version is a delete marker, the index with it is still returned. +// If the object does not exist ObjectNotFound error is returned. +// If any other error is found, it is returned. +// The check is skipped if there is only one zone, and 0, nil is always returned in that case. +func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) { + if z.SinglePool() { + return 0, nil + } + + errs := make([]error, len(z.serverPools)) + objInfos := make([]ObjectInfo, len(z.serverPools)) + + var wg sync.WaitGroup + for i, pool := range z.serverPools { + wg.Add(1) + go func(i int, pool *erasureSets) { + defer wg.Done() + objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{}) + }(i, pool) + } + wg.Wait() + + for i, err := range errs { + if err == nil { + return i, nil + } + if isErrObjectNotFound(err) { + // No object exists or its a delete marker, + // check objInfo to confirm. + if objInfos[i].DeleteMarker && objInfos[i].Name != "" { + return i, nil + } + + // objInfo is not valid, truly the object doesn't + // exist proceed to next pool. + continue + } + return -1, err + } + + return -1, toObjectErr(errFileNotFound, bucket, object) +} + // getPoolIdx returns the found previous object and its corresponding pool idx, // if none are found falls back to most available space pool. func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { @@ -715,8 +759,7 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob return z.serverPools[0].DeleteObject(ctx, bucket, object, opts) } - // We don't know the size here set 1GiB atleast. - idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + idx, err := z.getPoolIdxExisting(ctx, bucket, object) if err != nil { return objInfo, err } @@ -739,7 +782,11 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o origIndexMap := map[int][]int{} if !z.SinglePool() { for j, obj := range objects { - idx, err := z.getPoolIdx(ctx, bucket, obj.ObjectName, 1<<30) + idx, err := z.getPoolIdxExisting(ctx, bucket, obj.ObjectName) + if isErrObjectNotFound(err) { + derrs[j] = err + continue + } if err != nil { // Unhandled errors return right here. for i := range derrs { @@ -1734,7 +1781,7 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s } // We don't know the size here set 1GiB atleast. - idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + idx, err := z.getPoolIdxExisting(ctx, bucket, object) if err != nil { return ObjectInfo{}, err } @@ -1749,8 +1796,7 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts) } - // We don't know the size here set 1GiB atleast. - idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + idx, err := z.getPoolIdxExisting(ctx, bucket, object) if err != nil { return ObjectInfo{}, err } @@ -1765,8 +1811,7 @@ func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object s return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts) } - // We don't know the size here set 1GiB atleast. - idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + idx, err := z.getPoolIdxExisting(ctx, bucket, object) if err != nil { return nil, err }