fs: Handle cases of PutObject for an existing prefix. (#1478)

This commit is contained in:
Harshavardhana 2016-05-04 12:18:40 -07:00
parent da3a53376c
commit dd417e5476
4 changed files with 25 additions and 9 deletions

View File

@ -138,14 +138,14 @@ func putObjectCommon(storage StorageAPI, bucket string, object string, size int6
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", toObjectErr(err)
return "", toObjectErr(err, bucket, object)
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
return "", toObjectErr(err, bucket, object)
}
}
@ -169,7 +169,10 @@ func putObjectCommon(storage StorageAPI, bucket string, object string, size int6
}
err = storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
return "", err
if derr := storage.DeleteFile(minioMetaBucket, tempObj); derr != nil {
return "", derr
}
return "", toObjectErr(err, bucket, object)
}
// Return md5sum, successfully wrote object.

View File

@ -40,7 +40,7 @@ func toObjectErr(err error, params ...string) error {
return StorageInsufficientReadResources{}
case errWriteQuorum:
return StorageInsufficientWriteResources{}
case errIsNotRegular:
case errIsNotRegular, errFileAccessDenied:
if len(params) >= 2 {
return ObjectExistsAsPrefix{
Bucket: params[0],

View File

@ -596,12 +596,13 @@ func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser,
}).Debugf("getVolumeDir failed with %s", err)
return nil, err
}
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return nil, err
}
filePath := filepath.Join(volumeDir, path)
// Verify if the file already exists and is not of regular type.
if st, err := os.Stat(filePath); err == nil {
var st os.FileInfo
if st, err = os.Stat(filePath); err == nil {
if st.IsDir() {
log.WithFields(logrus.Fields{
"diskPath": s.diskPath,
@ -610,7 +611,15 @@ func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser,
return nil, errIsNotRegular
}
}
return safe.CreateFileWithPrefix(filePath, "$tmpfile")
w, err := safe.CreateFileWithPrefix(filePath, "$tmpfile")
if err != nil {
// File path cannot be verified since one of the parents is a file.
if strings.Contains(err.Error(), "not a directory") {
return nil, errFileAccessDenied
}
return nil, err
}
return w, nil
}
// StatFile - get file info.
@ -753,6 +762,10 @@ func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) err
return err
}
if err = os.MkdirAll(path.Join(dstVolumeDir, path.Dir(dstPath)), 0755); err != nil {
// File path cannot be verified since one of the parents is a file.
if strings.Contains(err.Error(), "not a directory") {
return errFileAccessDenied
}
log.Errorf("os.MkdirAll failed with %s", err)
return err
}

View File

@ -157,8 +157,8 @@ func checkPostPolicy(formValues map[string]string) APIErrorCode {
return ErrSignatureVersionNotSupported
}
/// Decoding policy
policyBytes, e := base64.StdEncoding.DecodeString(formValues["Policy"])
if e != nil {
policyBytes, err := base64.StdEncoding.DecodeString(formValues["Policy"])
if err != nil {
return ErrMalformedPOSTRequest
}
postPolicyForm, err := parsePostPolicyFormV4(string(policyBytes))