mirror of
https://github.com/minio/minio.git
synced 2025-04-21 02:54:24 -04:00
fix: optimize parentDirIsObject by moving isObject to storage layer (#11291)
For objects with `N` prefix depth, this PR reduces `N` such network operations by converting `CheckFile` into a single bulk operation. Reduction in chattiness here would allow disks to be utilized more cleanly, while maintaining the same functionality along with one extra volume check stat() call is removed. Update tests to test multiple sets scenario
This commit is contained in:
parent
3d9000d5b5
commit
3ca6330661
@ -19,7 +19,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/minio/minio/pkg/sync/errgroup"
|
"github.com/minio/minio/pkg/sync/errgroup"
|
||||||
@ -204,32 +203,12 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This function does the following check, suppose
|
// This function does the following check, suppose
|
||||||
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
|
// object is "a/b/c/d", stat makes sure that objects
|
||||||
// "a/b" and "a" do not exist.
|
// - "a/b/c"
|
||||||
|
// - "a/b"
|
||||||
|
// - "a"
|
||||||
|
// do not exist on the namespace.
|
||||||
func (er erasureObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
|
func (er erasureObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
|
||||||
path := ""
|
|
||||||
segments := strings.Split(parent, slashSeparator)
|
|
||||||
for _, s := range segments {
|
|
||||||
if s == "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
path += s
|
|
||||||
isObject, pathNotExist := er.isObject(ctx, bucket, path)
|
|
||||||
if pathNotExist {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if isObject {
|
|
||||||
// If there is already a file at prefix "p", return true.
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
path += slashSeparator
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// isObject - returns `true` if the prefix is an object i.e if
|
|
||||||
// `xl.meta` exists at the leaf, false otherwise.
|
|
||||||
func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (ok, pathDoesNotExist bool) {
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
g := errgroup.WithNErrs(len(storageDisks))
|
||||||
@ -241,7 +220,7 @@ func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (o
|
|||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
// Check if 'prefix' is an object on this 'disk', else continue the check the next disk
|
// Check if 'prefix' is an object on this 'disk', else continue the check the next disk
|
||||||
return storageDisks[index].CheckFile(ctx, bucket, prefix)
|
return storageDisks[index].CheckFile(ctx, bucket, parent)
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,6 +230,5 @@ func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (o
|
|||||||
// ignored if necessary.
|
// ignored if necessary.
|
||||||
readQuorum := getReadQuorum(len(storageDisks))
|
readQuorum := getReadQuorum(len(storageDisks))
|
||||||
|
|
||||||
err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum)
|
return reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) == nil
|
||||||
return err == nil, err == errPathNotFound
|
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,7 @@ func TestErasureParentDirIsObject(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
obj, fsDisks, err := prepareErasure16(ctx)
|
obj, fsDisks, err := prepareErasureSets32(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unable to initialize 'Erasure' object layer.")
|
t.Fatalf("Unable to initialize 'Erasure' object layer.")
|
||||||
}
|
}
|
||||||
@ -45,59 +45,38 @@ func TestErasureParentDirIsObject(t *testing.T) {
|
|||||||
if err = obj.MakeBucketWithLocation(GlobalContext, bucketName, BucketOptions{}); err != nil {
|
if err = obj.MakeBucketWithLocation(GlobalContext, bucketName, BucketOptions{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objectContent := "12345"
|
objectContent := "12345"
|
||||||
objInfo, err := obj.PutObject(GlobalContext, bucketName, objectName,
|
_, err = obj.PutObject(GlobalContext, bucketName, objectName,
|
||||||
mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{})
|
mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if objInfo.Name != objectName {
|
|
||||||
t.Fatalf("Unexpected object name returned got %s, expected %s", objInfo.Name, objectName)
|
|
||||||
}
|
|
||||||
|
|
||||||
z := obj.(*erasureServerPools)
|
|
||||||
xl := z.serverPools[0].sets[0]
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
parentIsObject bool
|
expectedErr bool
|
||||||
objectName string
|
objectName string
|
||||||
}{
|
}{
|
||||||
// parentIsObject is true if object is available.
|
|
||||||
{
|
{
|
||||||
parentIsObject: true,
|
expectedErr: true,
|
||||||
objectName: objectName,
|
objectName: pathJoin(objectName, "parent-is-object"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
parentIsObject: false,
|
expectedErr: false,
|
||||||
objectName: "new-dir",
|
objectName: pathJoin("no-parent", "object"),
|
||||||
},
|
|
||||||
{
|
|
||||||
parentIsObject: false,
|
|
||||||
objectName: "",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
parentIsObject: false,
|
|
||||||
objectName: ".",
|
|
||||||
},
|
|
||||||
// Should not cause infinite loop.
|
|
||||||
{
|
|
||||||
parentIsObject: false,
|
|
||||||
objectName: SlashSeparator,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
parentIsObject: false,
|
|
||||||
objectName: "\\",
|
|
||||||
},
|
|
||||||
// Should not cause infinite loop with double forward slash.
|
|
||||||
{
|
|
||||||
parentIsObject: false,
|
|
||||||
objectName: "//",
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
gotValue := xl.parentDirIsObject(GlobalContext, bucketName, testCase.objectName)
|
t.Run("", func(t *testing.T) {
|
||||||
if testCase.parentIsObject != gotValue {
|
_, err = obj.PutObject(GlobalContext, bucketName, testCase.objectName,
|
||||||
t.Errorf("Test %d: Unexpected value returned got %t, expected %t", i+1, gotValue, testCase.parentIsObject)
|
mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{})
|
||||||
}
|
if testCase.expectedErr && err == nil {
|
||||||
|
t.Error("Expected error but got nil")
|
||||||
|
}
|
||||||
|
if !testCase.expectedErr && err != nil {
|
||||||
|
t.Errorf("Expected nil but got %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -774,6 +774,9 @@ func (s *erasureSets) GetObject(ctx context.Context, bucket, object string, star
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
|
func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
|
||||||
|
if parent == "." {
|
||||||
|
return false
|
||||||
|
}
|
||||||
return s.getHashedSet(parent).parentDirIsObject(ctx, bucket, parent)
|
return s.getHashedSet(parent).parentDirIsObject(ctx, bucket, parent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1778,6 +1778,12 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CheckFile check if path has necessary metadata.
|
// CheckFile check if path has necessary metadata.
|
||||||
|
// This function does the following check, suppose
|
||||||
|
// you are creating a metadata file at "a/b/c/d/xl.meta",
|
||||||
|
// makes sure that there is no `xl.meta` at
|
||||||
|
// - "a/b/c/"
|
||||||
|
// - "a/b/"
|
||||||
|
// - "a/"
|
||||||
func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) error {
|
func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) error {
|
||||||
atomic.AddInt32(&s.activeIOCount, 1)
|
atomic.AddInt32(&s.activeIOCount, 1)
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -1789,44 +1795,47 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat a volume entry.
|
var checkFile func(p string) error
|
||||||
_, err = os.Stat(pathJoin(volumeDir, path))
|
checkFile = func(p string) error {
|
||||||
if err != nil {
|
if p == "." || p == SlashSeparator {
|
||||||
if osIsNotExist(err) {
|
|
||||||
return errPathNotFound
|
return errPathNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
filePath := pathJoin(volumeDir, p, xlStorageFormatFile)
|
||||||
|
if err := checkPathLength(filePath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
filePath := pathJoin(volumeDir, path, xlStorageFormatFile)
|
st, _ := os.Lstat(filePath)
|
||||||
if err = checkPathLength(filePath); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
filePathOld := pathJoin(volumeDir, path, xlStorageFormatFileV1)
|
|
||||||
if err = checkPathLength(filePathOld); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
st, err := os.Stat(filePath)
|
|
||||||
if err != nil && !osIsNotExist(err) {
|
|
||||||
return osErrToFileErr(err)
|
|
||||||
}
|
|
||||||
if st == nil {
|
if st == nil {
|
||||||
st, err = os.Stat(filePathOld)
|
if s.formatLegacy {
|
||||||
if err != nil {
|
filePathOld := pathJoin(volumeDir, p, xlStorageFormatFileV1)
|
||||||
return osErrToFileErr(err)
|
if err := checkPathLength(filePathOld); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
st, _ = os.Lstat(filePathOld)
|
||||||
|
if st == nil {
|
||||||
|
return errPathNotFound
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If its a directory its not a regular file.
|
if st != nil {
|
||||||
if st.Mode().IsDir() || st.Size() == 0 {
|
if !st.Mode().IsRegular() {
|
||||||
|
// not a regular file return error.
|
||||||
return errFileNotFound
|
return errFileNotFound
|
||||||
}
|
}
|
||||||
|
// Success fully found
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return checkFile(slashpath.Dir(p))
|
||||||
|
}
|
||||||
|
|
||||||
|
return checkFile(path)
|
||||||
|
}
|
||||||
|
|
||||||
// deleteFile deletes a file or a directory if its empty unless recursive
|
// deleteFile deletes a file or a directory if its empty unless recursive
|
||||||
// is set to true. If the target is successfully deleted, it will recursively
|
// is set to true. If the target is successfully deleted, it will recursively
|
||||||
// move up the tree, deleting empty parent directories until it finds one
|
// move up the tree, deleting empty parent directories until it finds one
|
||||||
@ -1837,8 +1846,8 @@ func deleteFile(basePath, deletePath string, recursive bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
isObjectDir := HasSuffix(deletePath, SlashSeparator)
|
isObjectDir := HasSuffix(deletePath, SlashSeparator)
|
||||||
basePath = filepath.Clean(basePath)
|
basePath = slashpath.Clean(basePath)
|
||||||
deletePath = filepath.Clean(deletePath)
|
deletePath = slashpath.Clean(deletePath)
|
||||||
if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath {
|
if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1872,7 +1881,7 @@ func deleteFile(basePath, deletePath string, recursive bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deletePath = filepath.Dir(deletePath)
|
deletePath = slashpath.Dir(deletePath)
|
||||||
|
|
||||||
// Delete parent directory obviously not recursively. Errors for
|
// Delete parent directory obviously not recursively. Errors for
|
||||||
// parent directories shouldn't trickle down.
|
// parent directories shouldn't trickle down.
|
||||||
|
@ -1657,6 +1657,10 @@ func TestXLStorageCheckFile(t *testing.T) {
|
|||||||
t.Fatalf("Unable to create file, %s", err)
|
t.Fatalf("Unable to create file, %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := xlStorage.MakeVol(context.Background(), "success-vol/path/to/"+xlStorageFormatFile); err != nil {
|
||||||
|
t.Fatalf("Unable to create path, %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
srcVol string
|
srcVol string
|
||||||
srcPath string
|
srcPath string
|
||||||
@ -1695,7 +1699,7 @@ func TestXLStorageCheckFile(t *testing.T) {
|
|||||||
{
|
{
|
||||||
srcVol: "success-vol",
|
srcVol: "success-vol",
|
||||||
srcPath: "path",
|
srcPath: "path",
|
||||||
expectedErr: errFileNotFound,
|
expectedErr: errPathNotFound,
|
||||||
},
|
},
|
||||||
// TestXLStorage case - 6.
|
// TestXLStorage case - 6.
|
||||||
// TestXLStorage case with non existent volume.
|
// TestXLStorage case with non existent volume.
|
||||||
@ -1704,6 +1708,13 @@ func TestXLStorageCheckFile(t *testing.T) {
|
|||||||
srcPath: "success-file",
|
srcPath: "success-file",
|
||||||
expectedErr: errPathNotFound,
|
expectedErr: errPathNotFound,
|
||||||
},
|
},
|
||||||
|
// TestXLStorage case - 7.
|
||||||
|
// TestXLStorage case with file with directory.
|
||||||
|
{
|
||||||
|
srcVol: "success-vol",
|
||||||
|
srcPath: "path/to",
|
||||||
|
expectedErr: errFileNotFound,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user