mirror of https://github.com/minio/minio.git
Support creating directories on erasure coded backend (#5443)
This PR continues from #5049 where we started supporting directories for erasure coded backend
This commit is contained in:
parent
45c35b3544
commit
3ea28e9771
22
cmd/fs-v1.go
22
cmd/fs-v1.go
|
@ -440,7 +440,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
|
||||||
// startOffset indicates the starting read location of the object.
|
// startOffset indicates the starting read location of the object.
|
||||||
// length indicates the total length of the object.
|
// length indicates the total length of the object.
|
||||||
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) {
|
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) {
|
||||||
if err = checkBucketAndObjectNamesFS(bucket, object); err != nil {
|
if err = checkGetObjArgs(bucket, object); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -572,22 +572,6 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error
|
||||||
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks bucket and object name validity, returns nil if both are valid.
|
|
||||||
func checkBucketAndObjectNamesFS(bucket, object string) error {
|
|
||||||
// Verify if bucket is valid.
|
|
||||||
if !IsValidBucketName(bucket) {
|
|
||||||
return errors.Trace(BucketNameInvalid{Bucket: bucket})
|
|
||||||
}
|
|
||||||
// Verify if object is valid.
|
|
||||||
if len(object) == 0 {
|
|
||||||
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
||||||
}
|
|
||||||
if !IsValidObjectPrefix(object) {
|
|
||||||
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||||
func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
|
func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
|
@ -597,7 +581,7 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error
|
||||||
}
|
}
|
||||||
defer objectLock.RUnlock()
|
defer objectLock.RUnlock()
|
||||||
|
|
||||||
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
|
if err := checkGetObjArgs(bucket, object); err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -775,7 +759,7 @@ func (fs fsObjects) DeleteObject(bucket, object string) error {
|
||||||
}
|
}
|
||||||
defer objectLock.Unlock()
|
defer objectLock.Unlock()
|
||||||
|
|
||||||
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
|
if err := checkDelObjArgs(bucket, object); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
humanize "github.com/dustin/go-humanize"
|
humanize "github.com/dustin/go-humanize"
|
||||||
|
@ -221,6 +222,11 @@ func cleanupDir(storage StorageAPI, volume, dirPath string) error {
|
||||||
return errors.Trace(err)
|
return errors.Trace(err)
|
||||||
} // else on success..
|
} // else on success..
|
||||||
|
|
||||||
|
// Entry path is empty, just delete it.
|
||||||
|
if len(entries) == 0 {
|
||||||
|
return errors.Trace(storage.DeleteFile(volume, path.Clean(entryPath)))
|
||||||
|
}
|
||||||
|
|
||||||
// Recurse and delete all other entries.
|
// Recurse and delete all other entries.
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
if err = delFunc(pathJoin(entryPath, entry)); err != nil {
|
if err = delFunc(pathJoin(entryPath, entry)); err != nil {
|
||||||
|
|
|
@ -38,11 +38,10 @@ func checkBucketAndObjectNames(bucket, object string) error {
|
||||||
return errors.Trace(BucketNameInvalid{Bucket: bucket})
|
return errors.Trace(BucketNameInvalid{Bucket: bucket})
|
||||||
}
|
}
|
||||||
// Verify if object is valid.
|
// Verify if object is valid.
|
||||||
if !IsValidObjectName(object) {
|
if len(object) == 0 {
|
||||||
// Objects with "/" are invalid, verify to return a different error.
|
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||||
if hasSuffix(object, slashSeparator) || hasPrefix(object, slashSeparator) {
|
}
|
||||||
return errors.Trace(ObjectNotFound{Bucket: bucket, Object: object})
|
if !IsValidObjectPrefix(object) {
|
||||||
}
|
|
||||||
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
|
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -21,5 +21,5 @@ package cmd
|
||||||
// isValidVolname verifies a volname name in accordance with object
|
// isValidVolname verifies a volname name in accordance with object
|
||||||
// layer requirements.
|
// layer requirements.
|
||||||
func isValidVolname(volname string) bool {
|
func isValidVolname(volname string) bool {
|
||||||
return !(len(volname) < 3 || len(volname) > 63)
|
return !(len(volname) < 3)
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,7 @@ func TestIsValidVolname(t *testing.T) {
|
||||||
{"tHIS-ENDS-WITH-UPPERCASE", true},
|
{"tHIS-ENDS-WITH-UPPERCASE", true},
|
||||||
{"ThisBeginsAndEndsWithUpperCase", true},
|
{"ThisBeginsAndEndsWithUpperCase", true},
|
||||||
{"una ñina", true},
|
{"una ñina", true},
|
||||||
|
{"lalalallalallalalalallalallalala-theString-size-is-greater-than-64", true},
|
||||||
// cases for which test should fail.
|
// cases for which test should fail.
|
||||||
// passing invalid bucket names.
|
// passing invalid bucket names.
|
||||||
{"", false},
|
{"", false},
|
||||||
|
@ -58,7 +59,6 @@ func TestIsValidVolname(t *testing.T) {
|
||||||
{"ab", false},
|
{"ab", false},
|
||||||
{"ab/", true},
|
{"ab/", true},
|
||||||
{"......", true},
|
{"......", true},
|
||||||
{"lalalallalallalalalallalallalala-theString-size-is-greater-than-64", false},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
|
|
|
@ -25,7 +25,7 @@ import (
|
||||||
// isValidVolname verifies a volname name in accordance with object
|
// isValidVolname verifies a volname name in accordance with object
|
||||||
// layer requirements.
|
// layer requirements.
|
||||||
func isValidVolname(volname string) bool {
|
func isValidVolname(volname string) bool {
|
||||||
if len(volname) < 3 || len(volname) > 63 {
|
if len(volname) < 3 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// Volname shouldn't have reserved characters on windows in it.
|
// Volname shouldn't have reserved characters on windows in it.
|
||||||
|
|
21
cmd/posix.go
21
cmd/posix.go
|
@ -280,18 +280,21 @@ func (s *posix) MakeVol(volume string) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Make a volume entry, with mode 0777 mkdir honors system umask.
|
|
||||||
err = os.Mkdir((volumeDir), 0777)
|
if _, err := os.Stat(volumeDir); err != nil {
|
||||||
if err != nil {
|
// Volume does not exist we proceed to create.
|
||||||
if os.IsExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return errVolumeExists
|
// Make a volume entry, with mode 0777 mkdir honors system umask.
|
||||||
} else if os.IsPermission(err) {
|
err = os.MkdirAll(volumeDir, 0777)
|
||||||
|
}
|
||||||
|
if os.IsPermission(err) {
|
||||||
return errDiskAccessDenied
|
return errDiskAccessDenied
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Success
|
|
||||||
return nil
|
// Stat succeeds we return errVolumeExists.
|
||||||
|
return errVolumeExists
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListVols - list volumes.
|
// ListVols - list volumes.
|
||||||
|
@ -381,7 +384,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||||
}
|
}
|
||||||
// Stat a volume entry.
|
// Stat a volume entry.
|
||||||
var st os.FileInfo
|
var st os.FileInfo
|
||||||
st, err = os.Stat((volumeDir))
|
st, err = os.Stat(volumeDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return VolInfo{}, errVolumeNotFound
|
return VolInfo{}, errVolumeNotFound
|
||||||
|
|
|
@ -861,7 +861,7 @@ func TestPosixDeleteFile(t *testing.T) {
|
||||||
// TestPosix case - 4.
|
// TestPosix case - 4.
|
||||||
// TestPosix case with segment of the volume name > 255.
|
// TestPosix case with segment of the volume name > 255.
|
||||||
{
|
{
|
||||||
srcVol: "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001",
|
srcVol: "my",
|
||||||
srcPath: "success-file",
|
srcPath: "success-file",
|
||||||
ioErrCnt: 0,
|
ioErrCnt: 0,
|
||||||
expectedErr: errInvalidArgument,
|
expectedErr: errInvalidArgument,
|
||||||
|
|
|
@ -76,9 +76,7 @@ func verifyError(c *check, response *http.Response, code, description string, st
|
||||||
func runAllTests(suite *TestSuiteCommon, c *check) {
|
func runAllTests(suite *TestSuiteCommon, c *check) {
|
||||||
suite.SetUpSuite(c)
|
suite.SetUpSuite(c)
|
||||||
suite.TestBucketSQSNotificationWebHook(c)
|
suite.TestBucketSQSNotificationWebHook(c)
|
||||||
if suite.serverType == "XL" {
|
suite.TestObjectDir(c)
|
||||||
suite.TestObjectDir(c)
|
|
||||||
}
|
|
||||||
suite.TestBucketSQSNotificationAMQP(c)
|
suite.TestBucketSQSNotificationAMQP(c)
|
||||||
suite.TestBucketPolicy(c)
|
suite.TestBucketPolicy(c)
|
||||||
suite.TestDeleteBucket(c)
|
suite.TestDeleteBucket(c)
|
||||||
|
@ -260,7 +258,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) {
|
||||||
response, err = client.Do(request)
|
response, err = client.Do(request)
|
||||||
|
|
||||||
c.Assert(err, nil)
|
c.Assert(err, nil)
|
||||||
c.Assert(response.StatusCode, http.StatusNotFound)
|
c.Assert(response.StatusCode, http.StatusOK)
|
||||||
|
|
||||||
request, err = newTestSignedRequest("GET", getGetObjectURL(s.endPoint, bucketName, "my-object-directory/"),
|
request, err = newTestSignedRequest("GET", getGetObjectURL(s.endPoint, bucketName, "my-object-directory/"),
|
||||||
0, nil, s.accessKey, s.secretKey, s.signer)
|
0, nil, s.accessKey, s.secretKey, s.signer)
|
||||||
|
@ -271,7 +269,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) {
|
||||||
response, err = client.Do(request)
|
response, err = client.Do(request)
|
||||||
|
|
||||||
c.Assert(err, nil)
|
c.Assert(err, nil)
|
||||||
c.Assert(response.StatusCode, http.StatusNotFound)
|
c.Assert(response.StatusCode, http.StatusOK)
|
||||||
|
|
||||||
request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "my-object-directory/"),
|
request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "my-object-directory/"),
|
||||||
0, nil, s.accessKey, s.secretKey, s.signer)
|
0, nil, s.accessKey, s.secretKey, s.signer)
|
||||||
|
@ -638,7 +636,7 @@ func (s *TestSuiteCommon) TestDeleteObject(c *check) {
|
||||||
// assert the status of http response.
|
// assert the status of http response.
|
||||||
c.Assert(response.StatusCode, http.StatusOK)
|
c.Assert(response.StatusCode, http.StatusOK)
|
||||||
|
|
||||||
// object name was "prefix/myobject", an attempt to delelte "prefix"
|
// object name was "prefix/myobject", an attempt to delete "prefix"
|
||||||
// Should not delete "prefix/myobject"
|
// Should not delete "prefix/myobject"
|
||||||
request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "prefix"),
|
request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "prefix"),
|
||||||
0, nil, s.accessKey, s.secretKey, s.signer)
|
0, nil, s.accessKey, s.secretKey, s.signer)
|
||||||
|
|
|
@ -33,6 +33,29 @@ import (
|
||||||
// list all errors which can be ignored in object operations.
|
// list all errors which can be ignored in object operations.
|
||||||
var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
|
var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
|
||||||
|
|
||||||
|
// putObjectDir hints the bottom layer to create a new directory.
|
||||||
|
func (xl xlObjects) putObjectDir(bucket, object string, writeQuorum int) error {
|
||||||
|
var wg = &sync.WaitGroup{}
|
||||||
|
|
||||||
|
errs := make([]error, len(xl.storageDisks))
|
||||||
|
// Prepare object creation in all disks
|
||||||
|
for index, disk := range xl.storageDisks {
|
||||||
|
if disk == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(index int, disk StorageAPI) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists {
|
||||||
|
errs[index] = err
|
||||||
|
}
|
||||||
|
}(index, disk)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
|
}
|
||||||
|
|
||||||
// prepareFile hints the bottom layer to optimize the creation of a new object
|
// prepareFile hints the bottom layer to optimize the creation of a new object
|
||||||
func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error {
|
func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error {
|
||||||
pErrs := make([]error, len(onlineDisks))
|
pErrs := make([]error, len(onlineDisks))
|
||||||
|
@ -204,6 +227,12 @@ func (xl xlObjects) getObject(bucket, object string, startOffset int64, length i
|
||||||
return errors.Trace(errUnexpected)
|
return errors.Trace(errUnexpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If its a directory request, we return an empty body.
|
||||||
|
if hasSuffix(object, slashSeparator) {
|
||||||
|
_, err := writer.Write([]byte(""))
|
||||||
|
return toObjectErr(errors.Trace(err), bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
||||||
|
|
||||||
|
@ -354,6 +383,42 @@ func (xl xlObjects) getObject(bucket, object string, startOffset int64, length i
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getObjectInfoDir - This getObjectInfo is specific to object directory lookup.
|
||||||
|
func (xl xlObjects) getObjectInfoDir(bucket, object string) (oi ObjectInfo, err error) {
|
||||||
|
var wg = &sync.WaitGroup{}
|
||||||
|
|
||||||
|
errs := make([]error, len(xl.storageDisks))
|
||||||
|
// Prepare object creation in a all disks
|
||||||
|
for index, disk := range xl.storageDisks {
|
||||||
|
if disk == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(index int, disk StorageAPI) {
|
||||||
|
defer wg.Done()
|
||||||
|
if _, err := disk.StatVol(pathJoin(bucket, object)); err != nil {
|
||||||
|
// Since we are re-purposing StatVol, an object which
|
||||||
|
// is a directory if it doesn't exist should be
|
||||||
|
// returned as errFileNotFound instead, convert
|
||||||
|
// the error right here accordingly.
|
||||||
|
if err == errVolumeNotFound {
|
||||||
|
err = errFileNotFound
|
||||||
|
} else if err == errVolumeAccessDenied {
|
||||||
|
err = errFileAccessDenied
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save error to reduce it later
|
||||||
|
errs[index] = err
|
||||||
|
}
|
||||||
|
}(index, disk)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
readQuorum := len(xl.storageDisks) / 2
|
||||||
|
return dirObjectInfo(bucket, object, 0, map[string]string{}), reduceReadQuorumErrs(errs, objectOpIgnoredErrs, readQuorum)
|
||||||
|
}
|
||||||
|
|
||||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||||
func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
|
func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
|
||||||
// Lock the object before reading.
|
// Lock the object before reading.
|
||||||
|
@ -367,15 +432,21 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if hasSuffix(object, slashSeparator) {
|
||||||
|
return xl.getObjectInfoDir(bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
info, err := xl.getObjectInfo(bucket, object)
|
info, err := xl.getObjectInfo(bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, toObjectErr(err, bucket, object)
|
return oi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
||||||
func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) {
|
func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) {
|
||||||
|
|
||||||
// Extracts xlStat and xlMetaMap.
|
// Extracts xlStat and xlMetaMap.
|
||||||
xlStat, xlMetaMap, err := xl.readXLMetaStat(bucket, object)
|
xlStat, xlMetaMap, err := xl.readXLMetaStat(bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -508,6 +579,26 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m
|
||||||
|
|
||||||
// putObject wrapper for xl PutObject
|
// putObject wrapper for xl PutObject
|
||||||
func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
|
func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
|
||||||
|
uniqueID := mustGetUUID()
|
||||||
|
tempObj := uniqueID
|
||||||
|
|
||||||
|
// No metadata is set, allocate a new one.
|
||||||
|
if metadata == nil {
|
||||||
|
metadata = make(map[string]string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get parity and data drive count based on storage class metadata
|
||||||
|
dataDrives, parityDrives := getRedundancyCount(metadata[amzStorageClass], len(xl.storageDisks))
|
||||||
|
|
||||||
|
// we now know the number of blocks this object needs for data and parity.
|
||||||
|
// writeQuorum is dataBlocks + 1
|
||||||
|
writeQuorum := dataDrives + 1
|
||||||
|
|
||||||
|
// Delete temporary object in the event of failure.
|
||||||
|
// If PutObject succeeded there would be no temporary
|
||||||
|
// object to delete.
|
||||||
|
defer xl.deleteObject(minioMetaTmpBucket, tempObj)
|
||||||
|
|
||||||
// This is a special case with size as '0' and object ends with
|
// This is a special case with size as '0' and object ends with
|
||||||
// a slash separator, we treat it like a valid operation and
|
// a slash separator, we treat it like a valid operation and
|
||||||
// return success.
|
// return success.
|
||||||
|
@ -518,6 +609,16 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m
|
||||||
if xl.parentDirIsObject(bucket, path.Dir(object)) {
|
if xl.parentDirIsObject(bucket, path.Dir(object)) {
|
||||||
return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
|
return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = xl.putObjectDir(minioMetaTmpBucket, tempObj, writeQuorum); err != nil {
|
||||||
|
return ObjectInfo{}, toObjectErr(errors.Trace(err), bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rename the successfully written temporary object to final location.
|
||||||
|
if _, err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||||
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
return dirObjectInfo(bucket, object, data.Size(), metadata), nil
|
return dirObjectInfo(bucket, object, data.Size(), metadata), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -538,14 +639,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m
|
||||||
return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
|
return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
// No metadata is set, allocate a new one.
|
|
||||||
if metadata == nil {
|
|
||||||
metadata = make(map[string]string)
|
|
||||||
}
|
|
||||||
|
|
||||||
uniqueID := mustGetUUID()
|
|
||||||
tempObj := uniqueID
|
|
||||||
|
|
||||||
// Limit the reader to its provided size if specified.
|
// Limit the reader to its provided size if specified.
|
||||||
var reader io.Reader = data
|
var reader io.Reader = data
|
||||||
|
|
||||||
|
@ -569,12 +662,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Get parity and data drive count based on storage class metadata
|
|
||||||
dataDrives, parityDrives := getRedundancyCount(metadata[amzStorageClass], len(xl.storageDisks))
|
|
||||||
|
|
||||||
// we now know the number of blocks this object needs for data and parity.
|
|
||||||
// writeQuorum is dataBlocks + 1
|
|
||||||
writeQuorum := dataDrives + 1
|
|
||||||
|
|
||||||
// Initialize parts metadata
|
// Initialize parts metadata
|
||||||
partsMetadata := make([]xlMetaV1, len(xl.storageDisks))
|
partsMetadata := make([]xlMetaV1, len(xl.storageDisks))
|
||||||
|
@ -589,11 +676,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m
|
||||||
// Order disks according to erasure distribution
|
// Order disks according to erasure distribution
|
||||||
onlineDisks := shuffleDisks(xl.storageDisks, partsMetadata[0].Erasure.Distribution)
|
onlineDisks := shuffleDisks(xl.storageDisks, partsMetadata[0].Erasure.Distribution)
|
||||||
|
|
||||||
// Delete temporary object in the event of failure.
|
|
||||||
// If PutObject succeeded there would be no temporary
|
|
||||||
// object to delete.
|
|
||||||
defer xl.deleteObject(minioMetaTmpBucket, tempObj)
|
|
||||||
|
|
||||||
// Total size of the written object
|
// Total size of the written object
|
||||||
var sizeWritten int64
|
var sizeWritten int64
|
||||||
|
|
||||||
|
@ -741,13 +823,20 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
|
||||||
// Initialize sync waitgroup.
|
// Initialize sync waitgroup.
|
||||||
var wg = &sync.WaitGroup{}
|
var wg = &sync.WaitGroup{}
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
var writeQuorum int
|
||||||
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
var err error
|
||||||
|
// If its a directory request, no need to read metadata.
|
||||||
|
if !hasSuffix(object, slashSeparator) {
|
||||||
|
// Read metadata associated with the object from all disks.
|
||||||
|
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(xl, metaArr, errs)
|
_, writeQuorum, err = objectQuorumFromMeta(xl, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
writeQuorum = len(xl.storageDisks)/2 + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize list of errors.
|
// Initialize list of errors.
|
||||||
|
@ -789,14 +878,20 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if hasSuffix(object, slashSeparator) {
|
||||||
|
// Delete the object on all disks.
|
||||||
|
if err = xl.deleteObject(bucket, object); err != nil {
|
||||||
|
return toObjectErr(err, bucket, object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Validate object exists.
|
// Validate object exists.
|
||||||
if !xl.isObject(bucket, object) {
|
if !xl.isObject(bucket, object) {
|
||||||
return errors.Trace(ObjectNotFound{bucket, object})
|
return errors.Trace(ObjectNotFound{bucket, object})
|
||||||
} // else proceed to delete the object.
|
} // else proceed to delete the object.
|
||||||
|
|
||||||
// Delete the object on all disks.
|
// Delete the object on all disks.
|
||||||
err = xl.deleteObject(bucket, object)
|
if err = xl.deleteObject(bucket, object); err != nil {
|
||||||
if err != nil {
|
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,8 +75,6 @@ func TestXLDeleteObjectBasic(t *testing.T) {
|
||||||
{"----", "obj", BucketNameInvalid{Bucket: "----"}},
|
{"----", "obj", BucketNameInvalid{Bucket: "----"}},
|
||||||
{"bucket", "", ObjectNameInvalid{Bucket: "bucket", Object: ""}},
|
{"bucket", "", ObjectNameInvalid{Bucket: "bucket", Object: ""}},
|
||||||
{"bucket", "doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "doesnotexist"}},
|
{"bucket", "doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "doesnotexist"}},
|
||||||
{"bucket", "obj/", ObjectNotFound{Bucket: "bucket", Object: "obj/"}},
|
|
||||||
{"bucket", "/obj", ObjectNotFound{Bucket: "bucket", Object: "/obj"}},
|
|
||||||
{"bucket", "obj", nil},
|
{"bucket", "obj", nil},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue