fs: Verify if parent is an object before i/o. (#4304)

PutObject() needs to verify and fail.

Fixes #4301
This commit is contained in:
Harshavardhana 2017-05-09 17:46:46 -07:00 committed by GitHub
parent 298b470f69
commit fa3f6d75b6
7 changed files with 108 additions and 23 deletions

View File

@ -174,7 +174,7 @@ func fsStatFile(statFile string) (os.FileInfo, error) {
return nil, traceError(err) return nil, traceError(err)
} }
if fi.IsDir() { if fi.IsDir() {
return nil, traceError(errFileNotFound) return nil, traceError(errFileAccessDenied)
} }
return fi, nil return fi, nil
} }

View File

@ -134,7 +134,7 @@ func TestFSStats(t *testing.T) {
srcFSPath: path, srcFSPath: path,
srcVol: "success-vol", srcVol: "success-vol",
srcPath: "path", srcPath: "path",
expectedErr: errFileNotFound, expectedErr: errFileAccessDenied,
}, },
// Test case - 6. // Test case - 6.
// Test case with src path segment > 255. // Test case with src path segment > 255.
@ -167,7 +167,8 @@ func TestFSStats(t *testing.T) {
for i, testCase := range testCases { for i, testCase := range testCases {
if testCase.srcPath != "" { if testCase.srcPath != "" {
if _, err := fsStatFile(pathJoin(testCase.srcFSPath, testCase.srcVol, testCase.srcPath)); errorCause(err) != testCase.expectedErr { if _, err := fsStatFile(pathJoin(testCase.srcFSPath, testCase.srcVol,
testCase.srcPath)); errorCause(err) != testCase.expectedErr {
t.Fatalf("TestPosix case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err) t.Fatalf("TestPosix case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err)
} }
} else { } else {

View File

@ -706,6 +706,11 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return ObjectInfo{}, err return ObjectInfo{}, err
} }
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(bucket, pathutil.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
if _, err := fs.statBucketDir(bucket); err != nil { if _, err := fs.statBucketDir(bucket); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket) return ObjectInfo{}, toObjectErr(err, bucket)
} }

View File

@ -25,6 +25,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"os/signal" "os/signal"
"path"
"path/filepath" "path/filepath"
"sort" "sort"
"syscall" "syscall"
@ -473,7 +474,6 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
// startOffset indicates the starting read location of the object. // startOffset indicates the starting read location of the object.
// length indicates the total length of the object. // length indicates the total length of the object.
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) { func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
// This is a special case with object whose name ends with
if err = checkGetObjArgs(bucket, object); err != nil { if err = checkGetObjArgs(bucket, object); err != nil {
return err return err
} }
@ -580,6 +580,25 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
return fs.getObjectInfo(bucket, object) return fs.getObjectInfo(bucket, object)
} }
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist.
func (fs fsObjects) parentDirIsObject(bucket, parent string) bool {
var isParentDirObject func(string) bool
isParentDirObject = func(p string) bool {
if p == "." {
return false
}
if _, err := fsStatFile(pathJoin(fs.fsPath, bucket, p)); err == nil {
// If there is already a file at prefix "p" return error.
return true
}
// Check if there is a file as one of the parent paths.
return isParentDirObject(path.Dir(p))
}
return isParentDirObject(parent)
}
// PutObject - creates an object upon reading from the input stream // PutObject - creates an object upon reading from the input stream
// until EOF, writes data directly to configured filesystem path. // until EOF, writes data directly to configured filesystem path.
// Additionally writes `fs.json` which carries the necessary metadata // Additionally writes `fs.json` which carries the necessary metadata
@ -590,6 +609,10 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// a slash separator, we treat it like a valid operation and // a slash separator, we treat it like a valid operation and
// return success. // return success.
if isObjectDir(object, size) { if isObjectDir(object, size) {
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
return dirObjectInfo(bucket, object, size, metadata), nil return dirObjectInfo(bucket, object, size, metadata), nil
} }
@ -597,6 +620,11 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
return ObjectInfo{}, err return ObjectInfo{}, err
} }
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
if _, err = fs.statBucketDir(bucket); err != nil { if _, err = fs.statBucketDir(bucket); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket) return ObjectInfo{}, toObjectErr(err, bucket)
} }

View File

@ -523,6 +523,54 @@ func TestFSGetBucketInfo(t *testing.T) {
} }
} }
func TestFSPutObject(t *testing.T) {
// Prepare for tests
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer removeAll(disk)
obj := initFSObjects(disk, t)
bucketName := "bucket"
objectName := "1/2/3/4/object"
if err := obj.MakeBucket(bucketName); err != nil {
t.Fatal(err)
}
sha256sum := ""
_, err := obj.PutObject(bucketName, objectName, int64(len("abcd")), bytes.NewReader([]byte("abcd")), nil, sha256sum)
if err != nil {
t.Fatal(err)
}
_, err = obj.PutObject(bucketName, objectName+"/1", int64(len("abcd")), bytes.NewReader([]byte("abcd")), nil, sha256sum)
if err == nil {
t.Fatal("Unexpected should fail here, backned corruption occurred")
}
if nerr, ok := errorCause(err).(PrefixAccessDenied); !ok {
t.Fatalf("Expected PrefixAccessDenied, got %#v", err)
} else {
if nerr.Bucket != "bucket" {
t.Fatalf("Expected 'bucket', got %s", nerr.Bucket)
}
if nerr.Object != "1/2/3/4/object/1" {
t.Fatalf("Expected '1/2/3/4/object/1', got %s", nerr.Object)
}
}
_, err = obj.PutObject(bucketName, objectName+"/1/", 0, bytes.NewReader([]byte("")), nil, sha256sum)
if err == nil {
t.Fatal("Unexpected should fail here, backned corruption occurred")
}
if nerr, ok := errorCause(err).(PrefixAccessDenied); !ok {
t.Fatalf("Expected PrefixAccessDenied, got %#v", err)
} else {
if nerr.Bucket != "bucket" {
t.Fatalf("Expected 'bucket', got %s", nerr.Bucket)
}
if nerr.Object != "1/2/3/4/object/1/" {
t.Fatalf("Expected '1/2/3/4/object/1/', got %s", nerr.Object)
}
}
}
// TestFSDeleteObject - test fs.DeleteObject() with healthy and corrupted disks // TestFSDeleteObject - test fs.DeleteObject() with healthy and corrupted disks
func TestFSDeleteObject(t *testing.T) { func TestFSDeleteObject(t *testing.T) {
// Prepare for tests // Prepare for tests

View File

@ -69,8 +69,8 @@ func testGetObjectInfo(obj ObjectLayer, instanceType string, t TestErrHandler) {
{"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false}, {"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false},
{"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false}, {"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false},
// Test case with existing bucket but object name set to a directory (Test number 12). // Test case with existing bucket but object name set to a directory (Test number 12).
{"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia"}, false}, {"test-getobjectinfo", "Asia/", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/"}, false},
// Valid case with existing object (Test number 13). // Valid case with existing object (Test number 14).
{"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true}, {"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true},
} }
for i, testCase := range testCases { for i, testCase := range testCases {

View File

@ -749,23 +749,26 @@ func testGetDirectoryReturnsObjectNotFound(obj ObjectLayer, instanceType string,
c.Fatalf("%s: <ERROR> %s", instanceType, err) c.Fatalf("%s: <ERROR> %s", instanceType, err)
} }
for i, objName := range []string{"dir1", "dir1/", "dir1/dir3", "dir1/dir3/"} { testCases := []struct {
_, err = obj.GetObjectInfo(bucketName, objName) dir string
if isErrObjectNotFound(err) { err error
err = errorCause(err) }{
err1 := err.(ObjectNotFound) {
if err1.Bucket != bucketName { dir: "dir1/",
c.Errorf("Test %d, %s: Expected the bucket name in the error message to be `%s`, but instead found `%s`", err: ObjectNotFound{Bucket: bucketName, Object: "dir1/"},
i+1, instanceType, bucketName, err1.Bucket) },
} {
if err1.Object != objName { dir: "dir1/dir3/",
c.Errorf("Test %d, %s: Expected the object name in the error message to be `%s`, but instead found `%s`", err: ObjectNotFound{Bucket: bucketName, Object: "dir1/dir3/"},
i+1, instanceType, objName, err1.Object) },
} }
} else {
if err.Error() != "ObjectNotFound" { for i, testCase := range testCases {
c.Errorf("Test %d, %s: Expected the error message to be `%s`, but instead found `%s`", i+1, instanceType, _, expectedErr := obj.GetObjectInfo(bucketName, testCase.dir)
"ObjectNotFound", err.Error()) if expectedErr != nil {
expectedErr = errorCause(expectedErr)
if expectedErr.Error() != testCase.err.Error() {
c.Errorf("Test %d, %s: Expected error %s, got %s", i+1, instanceType, testCase.err, expectedErr)
} }
} }
} }