parentDirIsObject() to return quickly with inexistant parent (#11204)

Rewrite parentIsObject() function. Currently if a client uploads
a/b/c/d, we always check if c, b, a are actual objects or not.

The new code will check with the reverse order and quickly quit if 
the segment doesn't exist.

So if a, b, c in 'a/b/c' does not exist in the first place, then returns
false quickly.
This commit is contained in:
Anis Elleuch 2021-01-02 21:01:29 +01:00 committed by GitHub
parent 677e80c0f8
commit c9d502e6fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 15 deletions

View File

@ -18,8 +18,8 @@ package cmd
import ( import (
"context" "context"
"path"
"sort" "sort"
"strings"
"sync" "sync"
"github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/sync/errgroup"
@ -207,24 +207,29 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
// object is "a/b/c/d", stat makes sure that objects ""a/b/c"" // object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist. // "a/b" and "a" do not exist.
func (er erasureObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool { func (er erasureObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
var isParentDirObject func(string) bool path := ""
isParentDirObject = func(p string) bool { segments := strings.Split(parent, slashSeparator)
if p == "." || p == SlashSeparator { for _, s := range segments {
if s == "" {
break
}
path += s
isObject, pathNotExist := er.isObject(ctx, bucket, path)
if pathNotExist {
return false return false
} }
if er.isObject(ctx, bucket, p) { if isObject {
// If there is already a file at prefix "p", return true. // If there is already a file at prefix "p", return true.
return true return true
} }
// Check if there is a file as one of the parent paths. path += slashSeparator
return isParentDirObject(path.Dir(p))
} }
return isParentDirObject(parent) return false
} }
// isObject - returns `true` if the prefix is an object i.e if // isObject - returns `true` if the prefix is an object i.e if
// `xl.meta` exists at the leaf, false otherwise. // `xl.meta` exists at the leaf, false otherwise.
func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (ok bool) { func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (ok, pathDoesNotExist bool) {
storageDisks := er.getDisks() storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks)) g := errgroup.WithNErrs(len(storageDisks))
@ -246,5 +251,6 @@ func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (o
// ignored if necessary. // ignored if necessary.
readQuorum := getReadQuorum(len(storageDisks)) readQuorum := getReadQuorum(len(storageDisks))
return reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) == nil err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum)
return err == nil, err == errPathNotFound
} }

View File

@ -66,6 +66,10 @@ func TestErasureParentDirIsObject(t *testing.T) {
parentIsObject: true, parentIsObject: true,
objectName: objectName, objectName: objectName,
}, },
{
parentIsObject: false,
objectName: "new-dir",
},
{ {
parentIsObject: false, parentIsObject: false,
objectName: "", objectName: "",

View File

@ -67,6 +67,9 @@ var errVolumeExists = StorageErr("volume already exists")
// errIsNotRegular - not of regular file type. // errIsNotRegular - not of regular file type.
var errIsNotRegular = StorageErr("not of regular file type") var errIsNotRegular = StorageErr("not of regular file type")
// errPathNotFound - cannot find the path.
var errPathNotFound = StorageErr("path not found")
// errVolumeNotFound - cannot find the volume. // errVolumeNotFound - cannot find the volume.
var errVolumeNotFound = StorageErr("volume not found") var errVolumeNotFound = StorageErr("volume not found")

View File

@ -82,6 +82,8 @@ func toStorageErr(err error) error {
return errFileNameTooLong return errFileNameTooLong
case errFileAccessDenied.Error(): case errFileAccessDenied.Error():
return errFileAccessDenied return errFileAccessDenied
case errPathNotFound.Error():
return errPathNotFound
case errIsNotRegular.Error(): case errIsNotRegular.Error():
return errIsNotRegular return errIsNotRegular
case errVolumeNotEmpty.Error(): case errVolumeNotEmpty.Error():

View File

@ -1757,10 +1757,10 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Stat(pathJoin(volumeDir, path))
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errPathNotFound
} }
return err return err
} }

View File

@ -1596,14 +1596,14 @@ func TestXLStorageCheckFile(t *testing.T) {
{ {
srcVol: "success-vol", srcVol: "success-vol",
srcPath: "nonexistent-file", srcPath: "nonexistent-file",
expectedErr: errFileNotFound, expectedErr: errPathNotFound,
}, },
// TestXLStorage case - 4. // TestXLStorage case - 4.
// TestXLStorage case with non-existent file path. // TestXLStorage case with non-existent file path.
{ {
srcVol: "success-vol", srcVol: "success-vol",
srcPath: "path/2/success-file", srcPath: "path/2/success-file",
expectedErr: errFileNotFound, expectedErr: errPathNotFound,
}, },
// TestXLStorage case - 5. // TestXLStorage case - 5.
// TestXLStorage case with path being a directory. // TestXLStorage case with path being a directory.
@ -1617,7 +1617,7 @@ func TestXLStorageCheckFile(t *testing.T) {
{ {
srcVol: "non-existent-vol", srcVol: "non-existent-vol",
srcPath: "success-file", srcPath: "success-file",
expectedErr: errVolumeNotFound, expectedErr: errPathNotFound,
}, },
} }