From 4a874dfbc1304a6b0f43373c6f3bcdfab75a8ee8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 11 Apr 2018 17:15:42 -0700 Subject: [PATCH] Ignore prefix renames when dest directory is not empty (#5798) Also make sure to not modify the underlying errors from layers, we should return the error as is and one object layer should translate the errors. Fixes #5797 --- cmd/posix.go | 1 + cmd/xl-sets.go | 12 ++++++++---- cmd/xl-v1-metadata.go | 2 +- cmd/xl-v1-object.go | 30 ++++++++++++++++++++---------- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/cmd/posix.go b/cmd/posix.go index 6615e7b7e..797913a64 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -994,6 +994,7 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e // we still need to allow overwriting an empty directory since it represents // an object empty directory. _, err = os.Stat(dstFilePath) + if err == nil && !isDirEmpty(dstFilePath) { return errFileAccessDenied } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index a7a947f70..8600d53f6 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -721,9 +721,10 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredE // ListObjects - implements listing of objects across sets, each set is independently // listed and subsequently merge lexically sorted inside listDirSetsFactory(). Resulting // value through the walk channel receives the data properly lexically sorted. -func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { +func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { + var result ListObjectsInfo // validate all the inputs for listObjects - if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil { + if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil { return result, err } @@ -762,6 +763,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi eof = true break } + // For any walk error return right away. if walkResult.err != nil { return result, toObjectErr(walkResult.err, bucket, prefix) @@ -778,8 +780,10 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi // Ignore errFileNotFound as the object might have got // deleted in the interim period of listing and getObjectInfo(), // ignore quorum error as it might be an entry from an outdated disk. - switch err { - case errFileNotFound, errXLReadQuorum: + if IsErrIgnored(err, []error{ + errFileNotFound, + errXLReadQuorum, + }...) { continue } return result, toObjectErr(err, bucket, prefix) diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index dc0722227..1ad5f1677 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -506,7 +506,7 @@ func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEnt isDir := false srcXLJSON := path.Join(srcEntry, xlMetaJSONFile) dstXLJSON := path.Join(dstEntry, xlMetaJSONFile) - return rename(ctx, disks, srcBucket, srcXLJSON, dstBucket, dstXLJSON, isDir, quorum) + return rename(ctx, disks, srcBucket, srcXLJSON, dstBucket, dstXLJSON, isDir, quorum, []error{errFileNotFound}) } // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index ddf3bf3d0..ac0a37fd7 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -381,11 +381,11 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o // get Quorum for this object readQuorum, _, err := objectQuorumFromMeta(xl, metaArr, errs) if err != nil { - return objInfo, toObjectErr(err, bucket, object) + return objInfo, err } if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { - return objInfo, toObjectErr(reducedErr, bucket, object) + return objInfo, reducedErr } // List all the file commit ids from parts metadata. @@ -432,7 +432,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str // rename - common function that renamePart and renameObject use to rename // the respective underlying storage layer representations. -func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int) ([]StorageAPI, error) { +func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -453,10 +453,11 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() - err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry) - if err != nil && err != errFileNotFound { - logger.LogIf(ctx, err) - errs[index] = err + if err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry); err != nil { + if !IsErrIgnored(err, ignoredErr...) { + logger.LogIf(ctx, err) + errs[index] = err + } } }(index, disk) } @@ -480,7 +481,16 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc // its proper location. func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) ([]StorageAPI, error) { isDir := false - return rename(ctx, disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum) + return rename(ctx, disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum, []error{errFileNotFound}) +} + +// renameObjectDir - renames all source objects directories to destination +// object directories across all disks in parallel. Additionally if we have +// errors and do not have a readQuorum partially renamed files are renamed +// back to its proper location. +func renameObjectDir(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) { + isDir := true + return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound, errFileAccessDenied}) } // renameObject - renames all source objects to destination object @@ -489,7 +499,7 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcPart, dst // its proper location. func renameObject(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) { isDir := true - return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum) + return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound}) } // PutObject - creates an object upon reading from the input stream @@ -548,7 +558,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, } // Rename the successfully written temporary object to final location. - if _, err = renameObject(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { + if _, err = renameObjectDir(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) }