Ignore prefix renames when dest directory is not empty (#5798)

Also make sure to not modify the underlying errors from
layers, we should return the error as is and one object
layer should translate the errors.

Fixes #5797
This commit is contained in:
Harshavardhana 2018-04-11 17:15:42 -07:00 committed by GitHub
parent 650c6ee8fb
commit 4a874dfbc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 15 deletions

View File

@ -994,6 +994,7 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
// we still need to allow overwriting an empty directory since it represents
// an object empty directory.
_, err = os.Stat(dstFilePath)
if err == nil && !isDirEmpty(dstFilePath) {
return errFileAccessDenied
}

View File

@ -721,9 +721,10 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredE
// ListObjects - implements listing of objects across sets, each set is independently
// listed and subsequently merge lexically sorted inside listDirSetsFactory(). Resulting
// value through the walk channel receives the data properly lexically sorted.
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var result ListObjectsInfo
// validate all the inputs for listObjects
if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil {
if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil {
return result, err
}
@ -762,6 +763,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
return result, toObjectErr(walkResult.err, bucket, prefix)
@ -778,8 +780,10 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
// Ignore errFileNotFound as the object might have got
// deleted in the interim period of listing and getObjectInfo(),
// ignore quorum error as it might be an entry from an outdated disk.
switch err {
case errFileNotFound, errXLReadQuorum:
if IsErrIgnored(err, []error{
errFileNotFound,
errXLReadQuorum,
}...) {
continue
}
return result, toObjectErr(err, bucket, prefix)

View File

@ -506,7 +506,7 @@ func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEnt
isDir := false
srcXLJSON := path.Join(srcEntry, xlMetaJSONFile)
dstXLJSON := path.Join(dstEntry, xlMetaJSONFile)
return rename(ctx, disks, srcBucket, srcXLJSON, dstBucket, dstXLJSON, isDir, quorum)
return rename(ctx, disks, srcBucket, srcXLJSON, dstBucket, dstXLJSON, isDir, quorum, []error{errFileNotFound})
}
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.

View File

@ -381,11 +381,11 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o
// get Quorum for this object
readQuorum, _, err := objectQuorumFromMeta(xl, metaArr, errs)
if err != nil {
return objInfo, toObjectErr(err, bucket, object)
return objInfo, err
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return objInfo, toObjectErr(reducedErr, bucket, object)
return objInfo, reducedErr
}
// List all the file commit ids from parts metadata.
@ -432,7 +432,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str
// rename - common function that renamePart and renameObject use to rename
// the respective underlying storage layer representations.
func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int) ([]StorageAPI, error) {
func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) {
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
@ -453,10 +453,11 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry)
if err != nil && err != errFileNotFound {
logger.LogIf(ctx, err)
errs[index] = err
if err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry); err != nil {
if !IsErrIgnored(err, ignoredErr...) {
logger.LogIf(ctx, err)
errs[index] = err
}
}
}(index, disk)
}
@ -480,7 +481,16 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
// its proper location.
func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) ([]StorageAPI, error) {
isDir := false
return rename(ctx, disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum)
return rename(ctx, disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum, []error{errFileNotFound})
}
// renameObjectDir - renames all source objects directories to destination
// object directories across all disks in parallel. Additionally if we have
// errors and do not have a readQuorum partially renamed files are renamed
// back to its proper location.
func renameObjectDir(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) {
isDir := true
return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound, errFileAccessDenied})
}
// renameObject - renames all source objects to destination object
@ -489,7 +499,7 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcPart, dst
// its proper location.
func renameObject(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) {
isDir := true
return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum)
return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound})
}
// PutObject - creates an object upon reading from the input stream
@ -548,7 +558,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
}
// Rename the successfully written temporary object to final location.
if _, err = renameObject(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
if _, err = renameObjectDir(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}