Merge pull request #1073 from harshavardhana/createobject

fs: Fail createObject with appropriate message.
This commit is contained in:
Harshavardhana 2016-02-03 21:56:44 -08:00
commit a1c6e4055b
4 changed files with 59 additions and 31 deletions

View File

@ -74,6 +74,7 @@ const (
SignatureVersionNotSupported
BucketNotEmpty
RootPathFull
ObjectExistsAsPrefix
)
// APIError code to Error structure map
@ -238,6 +239,11 @@ var errorCodeResponse = map[int]APIError{
Description: "Root path has reached its minimum free disk threshold. Please delete few objects to proceed.",
HTTPStatusCode: http.StatusInternalServerError,
},
ObjectExistsAsPrefix: {
Code: "ObjectExistsAsPrefix",
Description: "An object already exists as your prefix, choose a different prefix to proceed.",
HTTPStatusCode: http.StatusConflict,
},
}
// errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown

View File

@ -170,6 +170,8 @@ func (api CloudStorageAPI) PutObjectHandler(w http.ResponseWriter, req *http.Req
writeErrorResponse(w, req, EntityTooLarge, req.URL.Path)
case fs.InvalidDigest:
writeErrorResponse(w, req, InvalidDigest, req.URL.Path)
case fs.ObjectExistsAsPrefix:
writeErrorResponse(w, req, ObjectExistsAsPrefix, req.URL.Path)
default:
writeErrorResponse(w, req, InternalError, req.URL.Path)
}

View File

@ -102,6 +102,16 @@ func (e ObjectNotFound) Error() string {
return "Object not found: " + e.Bucket + "#" + e.Object
}
// ObjectExistsAsPrefix object already exists with a requested prefix.
type ObjectExistsAsPrefix struct {
Bucket string
Prefix string
}
func (e ObjectExistsAsPrefix) Error() string {
return "Object exists on : " + e.Bucket + " as prefix " + e.Prefix
}
// ObjectCorrupted object found to be corrupted
type ObjectCorrupted struct {
Object string

View File

@ -192,9 +192,9 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
fs.lock.Lock()
defer fs.lock.Unlock()
di, err := disk.GetInfo(fs.path)
if err != nil {
return ObjectMetadata{}, probe.NewError(err)
di, e := disk.GetInfo(fs.path)
if e != nil {
return ObjectMetadata{}, probe.NewError(e)
}
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
@ -210,7 +210,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
bucket = fs.denormalizeBucket(bucket)
bucketPath := filepath.Join(fs.path, bucket)
if _, e := os.Stat(bucketPath); e != nil {
if _, e = os.Stat(bucketPath); e != nil {
if os.IsNotExist(e) {
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
@ -225,8 +225,8 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
objectPath := filepath.Join(bucketPath, object)
if strings.TrimSpace(expectedMD5Sum) != "" {
var expectedMD5SumBytes []byte
expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
expectedMD5SumBytes, e = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if e != nil {
// pro-actively close the connection
return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
}
@ -234,9 +234,19 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
}
// write object
file, err := atomic.FileCreateWithPrefix(objectPath, "")
if err != nil {
return ObjectMetadata{}, probe.NewError(err)
file, e := atomic.FileCreateWithPrefix(objectPath, "")
if e != nil {
switch e := e.(type) {
case *os.PathError:
if e.Op == "mkdir" {
if strings.Contains(e.Error(), "not a directory") {
return ObjectMetadata{}, probe.NewError(ObjectExistsAsPrefix{Bucket: bucket, Prefix: object})
}
}
return ObjectMetadata{}, probe.NewError(e)
default:
return ObjectMetadata{}, probe.NewError(e)
}
}
h := md5.New()
@ -244,33 +254,33 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
mw := io.MultiWriter(file, h, sh)
if size > 0 {
_, err = io.CopyN(mw, data, size)
if err != nil {
_, e = io.CopyN(mw, data, size)
if e != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
return ObjectMetadata{}, probe.NewError(e)
}
} else {
_, err = io.Copy(mw, data)
if err != nil {
_, e = io.Copy(mw, data)
if e != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err)
return ObjectMetadata{}, probe.NewError(e)
}
}
md5Sum := hex.EncodeToString(h.Sum(nil))
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
if e := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); e != nil {
file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Object: object})
}
}
sha256Sum := hex.EncodeToString(sh.Sum(nil))
if signature != nil {
ok, perr := signature.DoesSignatureMatch(sha256Sum)
if perr != nil {
ok, err := signature.DoesSignatureMatch(sha256Sum)
if err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, perr.Trace()
return ObjectMetadata{}, err.Trace()
}
if !ok {
file.CloseAndPurge()
@ -279,9 +289,9 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
}
file.Close()
st, err := os.Stat(objectPath)
if err != nil {
return ObjectMetadata{}, probe.NewError(err)
st, e := os.Stat(objectPath)
if e != nil {
return ObjectMetadata{}, probe.NewError(e)
}
contentType := "application/octet-stream"
if objectExt := filepath.Ext(objectPath); objectExt != "" {
@ -303,28 +313,28 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error
return nil
}
fi, err := os.Stat(deletePath)
if err != nil {
if os.IsNotExist(err) {
fi, e := os.Stat(deletePath)
if e != nil {
if os.IsNotExist(e) {
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return probe.NewError(err)
return probe.NewError(e)
}
if fi.IsDir() {
empty, err := isDirEmpty(deletePath)
if err != nil {
return err.Trace()
return err.Trace(deletePath)
}
if !empty {
return nil
}
}
if err := os.Remove(deletePath); err != nil {
return probe.NewError(err)
if e := os.Remove(deletePath); e != nil {
return probe.NewError(e)
}
if err := deleteObjectPath(basePath, filepath.Dir(deletePath), bucket, object); err != nil {
return err.Trace()
return err.Trace(basePath, deletePath, bucket, object)
}
return nil
}
@ -367,7 +377,7 @@ func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error {
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
if err != nil {
return err.Trace()
return err.Trace(bucketPath, objectPath, bucket, object)
}
return nil
}