Fix a possible race during PutObject() (#5376)

Under any concurrent removeObjects in progress
might have removed the parents of the same prefix
for which there is an ongoing putObject request.
An inconsistent situation may arise as explained
below even under sufficient locking.

PutObject is almost successful at the last stage when
a temporary file is renamed to its actual namespace
at `a/b/c/object1`. Concurrently a RemoveObject is
also in progress at the same prefix for an `a/b/c/object2`.

To create the object1 at location `a/b/c` PutObject has
to create all the parents recursively.

```
a/b/c - os.MkdirAll loops through has now created
        'a/' and 'b/' about to create 'c/'
a/b/c/object2 - at this point 'c/' and 'object2'
        are deleted about to delete b/
```

Now for os.MkdirAll loop the expected situation is
that top level parent 'a/b/' exists which it created
, such that it can create 'c/' - since removeObject
and putObject do not compete for lock due to holding
locks at different resources. removeObject proceeds
to delete parent 'b/' since 'c/' is not yet present,
once deleted 'os.MkdirAll' would receive an error as
syscall.ENOENT which would fail the putObject request.

This PR tries to address this issue by implementing
a safer/guarded approach where we would retry an operation
such as `os.MkdirAll` and `os.Rename` if both operations
observe syscall.ENOENT.

Fixes #5254
This commit is contained in:
Harshavardhana 2018-01-13 09:13:02 -08:00 committed by Nitish Tiwari
parent 0bb6247056
commit 12f67d47f1
6 changed files with 211 additions and 80 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2016 Minio, Inc. * Minio Cloud Storage, (C) 2016, 2017, 2018 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package cmd package cmd
import ( import (
"fmt"
"io" "io"
"os" "os"
pathutil "path" pathutil "path"
@ -91,24 +90,6 @@ func fsRemoveDir(dirPath string) (err error) {
return nil return nil
} }
// Creates a new directory, parent dir is also recursively created
// if it doesn't exist.
func fsMkdirAll(dirPath string) (err error) {
if dirPath == "" {
return errors.Trace(errInvalidArgument)
}
if err = checkPathLength(dirPath); err != nil {
return errors.Trace(err)
}
if err = os.MkdirAll(dirPath, 0777); err != nil {
return errors.Trace(err)
}
return nil
}
// Creates a new directory, parent dir should exist // Creates a new directory, parent dir should exist
// otherwise returns an error. If directory already // otherwise returns an error. If directory already
// exists returns an error. Windows long paths // exists returns an error. Windows long paths
@ -277,7 +258,7 @@ func fsCreateFile(filePath string, reader io.Reader, buf []byte, fallocSize int6
return 0, errors.Trace(err) return 0, errors.Trace(err)
} }
if err := os.MkdirAll(pathutil.Dir(filePath), 0777); err != nil { if err := mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
return 0, errors.Trace(err) return 0, errors.Trace(err)
} }
@ -374,19 +355,16 @@ func fsRenameFile(sourcePath, destPath string) error {
if err := checkPathLength(destPath); err != nil { if err := checkPathLength(destPath); err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
// Verify if source path exists. // Verify if source path exists.
if _, err := os.Stat((sourcePath)); err != nil { if _, err := os.Stat(sourcePath); err != nil {
return osErrToFSFileErr(err) return osErrToFSFileErr(err)
} }
if err := os.MkdirAll(pathutil.Dir(destPath), 0777); err != nil {
return errors.Trace(err) if err := renameAll(sourcePath, destPath); err != nil {
}
if err := os.Rename((sourcePath), (destPath)); err != nil {
if isSysErrCrossDevice(err) {
return errors.Trace(fmt.Errorf("%s (%s)->(%s)", errCrossDeviceLink, sourcePath, destPath))
}
return errors.Trace(err) return errors.Trace(err)
} }
return nil return nil
} }

View File

@ -28,28 +28,6 @@ import (
"github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/lock"
) )
// Tests - fsMkdirAll()
func TestFSMkdirAll(t *testing.T) {
// create posix test setup
_, path, err := newPosixTestSetup()
if err != nil {
t.Fatalf("Unable to create posix test setup, %s", err)
}
defer os.RemoveAll(path)
if err = fsMkdirAll(""); errors.Cause(err) != errInvalidArgument {
t.Fatal("Unexpected error", err)
}
if err = fsMkdirAll(pathJoin(path, "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001")); errors.Cause(err) != errFileNameTooLong {
t.Fatal("Unexpected error", err)
}
if err = fsMkdirAll(pathJoin(path, "success-vol", "success-object")); err != nil {
t.Fatal("Unexpected error", err)
}
}
func TestFSRenameFile(t *testing.T) { func TestFSRenameFile(t *testing.T) {
// create posix test setup // create posix test setup
_, path, err := newPosixTestSetup() _, path, err := newPosixTestSetup()

View File

@ -665,7 +665,7 @@ func (fs fsObjects) putObject(bucket string, object string, data *hash.Reader, m
if fs.parentDirIsObject(bucket, path.Dir(object)) { if fs.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object) return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
} }
if err = fsMkdirAll(pathJoin(fs.fsPath, bucket, object)); err != nil { if err = mkdirAll(pathJoin(fs.fsPath, bucket, object), 0777); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
var fi os.FileInfo var fi os.FileInfo

121
cmd/os-reliable.go Normal file
View File

@ -0,0 +1,121 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"os"
"path"
)
// Wrapper functions to os.MkdirAll, which calls reliableMkdirAll
// this is to ensure that if there is a racy parent directory
// delete in between we can simply retry the operation.
func mkdirAll(dirPath string, mode os.FileMode) (err error) {
if dirPath == "" {
return errInvalidArgument
}
if err = checkPathLength(dirPath); err != nil {
return err
}
if err = reliableMkdirAll(dirPath, mode); err != nil {
// File path cannot be verified since one of the parents is a file.
if isSysErrNotDir(err) {
return errFileAccessDenied
} else if isSysErrPathNotFound(err) {
// This is a special case should be handled only for
// windows, because windows API does not return "not a
// directory" error message. Handle this specifically here.
return errFileAccessDenied
}
}
return err
}
// Reliably retries os.MkdirAll if for some reason os.MkdirAll returns
// syscall.ENOENT (parent does not exist).
func reliableMkdirAll(dirPath string, mode os.FileMode) (err error) {
i := 0
for {
// Creates all the parent directories, with mode 0777 mkdir honors system umask.
if err = os.MkdirAll(dirPath, mode); err != nil {
// Retry only for the first retryable error.
if os.IsNotExist(err) && i == 0 {
i++
continue
}
}
break
}
return err
}
// Wrapper function to os.Rename, which calls reliableMkdirAll
// and reliableRenameAll. This is to ensure that if there is a
// racy parent directory delete in between we can simply retry
// the operation.
func renameAll(srcFilePath, dstFilePath string) (err error) {
if srcFilePath == "" || dstFilePath == "" {
return errInvalidArgument
}
if err = checkPathLength(srcFilePath); err != nil {
return err
}
if err = checkPathLength(dstFilePath); err != nil {
return err
}
if err = reliableRename(srcFilePath, dstFilePath); err != nil {
if isSysErrNotDir(err) {
return errFileAccessDenied
} else if isSysErrPathNotFound(err) {
// This is a special case should be handled only for
// windows, because windows API does not return "not a
// directory" error message. Handle this specifically here.
return errFileAccessDenied
} else if isSysErrCrossDevice(err) {
return fmt.Errorf("%s (%s)->(%s)", errCrossDeviceLink, srcFilePath, dstFilePath)
} else if os.IsNotExist(err) {
return errFileNotFound
}
}
return err
}
// Reliably retries os.RenameAll if for some reason os.RenameAll returns
// syscall.ENOENT (parent does not exist).
func reliableRename(srcFilePath, dstFilePath string) (err error) {
i := 0
for {
if err = reliableMkdirAll(path.Dir(dstFilePath), 0777); err != nil {
return err
}
// After a successful parent directory create attempt a renameAll.
if err = os.Rename(srcFilePath, dstFilePath); err != nil {
// Retry only for the first retryable error.
if os.IsNotExist(err) && i == 0 {
i++
continue
}
}
break
}
return err
}

78
cmd/os-reliable_test.go Normal file
View File

@ -0,0 +1,78 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"os"
"testing"
"github.com/minio/minio/pkg/errors"
)
// Tests - mkdirAll()
func TestOSMkdirAll(t *testing.T) {
// create posix test setup
_, path, err := newPosixTestSetup()
if err != nil {
t.Fatalf("Unable to create posix test setup, %s", err)
}
defer os.RemoveAll(path)
if err = mkdirAll("", 0777); errors.Cause(err) != errInvalidArgument {
t.Fatal("Unexpected error", err)
}
if err = mkdirAll(pathJoin(path, "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), 0777); errors.Cause(err) != errFileNameTooLong {
t.Fatal("Unexpected error", err)
}
if err = mkdirAll(pathJoin(path, "success-vol", "success-object"), 0777); err != nil {
t.Fatal("Unexpected error", err)
}
}
// Tests - renameAll()
func TestOSRenameAll(t *testing.T) {
// create posix test setup
_, path, err := newPosixTestSetup()
if err != nil {
t.Fatalf("Unable to create posix test setup, %s", err)
}
defer os.RemoveAll(path)
if err = mkdirAll(pathJoin(path, "testvolume1"), 0777); err != nil {
t.Fatal(err)
}
if err = renameAll("", "foo"); err != errInvalidArgument {
t.Fatal(err)
}
if err = renameAll("foo", ""); err != errInvalidArgument {
t.Fatal(err)
}
if err = renameAll(pathJoin(path, "testvolume1"), pathJoin(path, "testvolume2")); err != nil {
t.Fatal(err)
}
if err = renameAll(pathJoin(path, "testvolume1"), pathJoin(path, "testvolume2")); errors.Cause(err) != errFileNotFound {
t.Fatal(err)
}
if err = renameAll(pathJoin(path, "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), pathJoin(path, "testvolume2")); errors.Cause(err) != errFileNameTooLong {
t.Fatal("Unexpected error", err)
}
if err = renameAll(pathJoin(path, "testvolume1"), pathJoin(path, "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001")); errors.Cause(err) != errFileNameTooLong {
t.Fatal("Unexpected error", err)
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. * Minio Cloud Storage, (C) 2016, 2017, 2018 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -674,14 +674,7 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) {
} else { } else {
// Create top level directories if they don't exist. // Create top level directories if they don't exist.
// with mode 0777 mkdir honors system umask. // with mode 0777 mkdir honors system umask.
if err = os.MkdirAll(slashpath.Dir(filePath), 0777); err != nil { if err = mkdirAll(slashpath.Dir(filePath), 0777); err != nil {
// File path cannot be verified since one of the parents is a file.
if isSysErrNotDir(err) {
return nil, errFileAccessDenied
} else if isSysErrPathNotFound(err) {
// Add specific case for windows.
return nil, errFileAccessDenied
}
return nil, err return nil, err
} }
} }
@ -970,25 +963,8 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
} }
// Destination does not exist, hence proceed with the rename. // Destination does not exist, hence proceed with the rename.
} }
// Creates all the parent directories, with mode 0777 mkdir honors system umask.
if err = os.MkdirAll(slashpath.Dir(dstFilePath), 0777); err != nil { if err = renameAll(srcFilePath, dstFilePath); err != nil {
// File path cannot be verified since one of the parents is a file.
if isSysErrNotDir(err) {
return errFileAccessDenied
} else if isSysErrPathNotFound(err) {
// This is a special case should be handled only for
// windows, because windows API does not return "not a
// directory" error message. Handle this specifically here.
return errFileAccessDenied
}
return err
}
// Finally attempt a rename.
err = os.Rename((srcFilePath), (dstFilePath))
if err != nil {
if os.IsNotExist(err) {
return errFileNotFound
}
return err return err
} }