erasure pools enable faster checks for file not found (#11799)

For operations that require the object to exist make it possible to 
detect if the file isn't found in *any* pool.

This will allow these to return the error early without having to re-check.
This commit is contained in:
Klaus Post 2021-03-16 19:02:20 +01:00 committed by GitHub
parent fa94682e83
commit 771dea175c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -244,6 +244,50 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, s
return serverPools
}
// getPoolIdxExisting returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
// If any other error is found, it is returned.
// The check is skipped if there is only one zone, and 0, nil is always returned in that case.
func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) {
if z.SinglePool() {
return 0, nil
}
errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
}(i, pool)
}
wg.Wait()
for i, err := range errs {
if err == nil {
return i, nil
}
if isErrObjectNotFound(err) {
// No object exists or its a delete marker,
// check objInfo to confirm.
if objInfos[i].DeleteMarker && objInfos[i].Name != "" {
return i, nil
}
// objInfo is not valid, truly the object doesn't
// exist proceed to next pool.
continue
}
return -1, err
}
return -1, toObjectErr(errFileNotFound, bucket, object)
}
// getPoolIdx returns the found previous object and its corresponding pool idx,
// if none are found falls back to most available space pool.
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
@ -715,8 +759,7 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
return z.serverPools[0].DeleteObject(ctx, bucket, object, opts)
}
// We don't know the size here set 1GiB atleast.
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30)
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
if err != nil {
return objInfo, err
}
@ -739,7 +782,11 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
origIndexMap := map[int][]int{}
if !z.SinglePool() {
for j, obj := range objects {
idx, err := z.getPoolIdx(ctx, bucket, obj.ObjectName, 1<<30)
idx, err := z.getPoolIdxExisting(ctx, bucket, obj.ObjectName)
if isErrObjectNotFound(err) {
derrs[j] = err
continue
}
if err != nil {
// Unhandled errors return right here.
for i := range derrs {
@ -1734,7 +1781,7 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s
}
// We don't know the size here set 1GiB atleast.
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30)
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
if err != nil {
return ObjectInfo{}, err
}
@ -1749,8 +1796,7 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
}
// We don't know the size here set 1GiB atleast.
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30)
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
if err != nil {
return ObjectInfo{}, err
}
@ -1765,8 +1811,7 @@ func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object s
return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts)
}
// We don't know the size here set 1GiB atleast.
idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30)
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
if err != nil {
return nil, err
}