xl: Respect min. space by checking PrepareFile err (#3867)

It was possible to upload a big file which overcomes the minimal
disk space limit in XL, PrepareFile was actually checking for disk
space but we weren't checking its returned error. This patch fixes
this behavior.
This commit is contained in:
Anis Elleuch 2017-03-07 23:48:56 +01:00 committed by Harshavardhana
parent 79e0b9e69a
commit a2eae54d11
3 changed files with 27 additions and 13 deletions

View File

@ -179,11 +179,11 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl
// Wrapper for calling PutObject tests for both XL multiple disks case
// when quorum is not available.
func TestObjectAPIPutObjectDiskNotFound(t *testing.T) {
ExecObjectLayerDiskAlteredTest(t, testObjectAPIPutObjectDiskNotFOund)
ExecObjectLayerDiskAlteredTest(t, testObjectAPIPutObjectDiskNotFound)
}
// Tests validate correctness of PutObject.
func testObjectAPIPutObjectDiskNotFOund(obj ObjectLayer, instanceType string, disks []string, t *testing.T) {
func testObjectAPIPutObjectDiskNotFound(obj ObjectLayer, instanceType string, disks []string, t *testing.T) {
// Generating cases for which the PutObject fails.
bucket := "minio-bucket"
object := "minio-object"

View File

@ -648,11 +648,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
defer xl.deleteObject(minioMetaTmpBucket, tmpPart)
if size > 0 {
for _, disk := range onlineDisks {
if disk != nil {
actualSize := xl.sizeOnDisk(size, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
disk.PrepareFile(minioMetaTmpBucket, tmpPartPath, actualSize)
}
if pErr := xl.prepareFile(minioMetaTmpBucket, tmpPartPath, size, onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks); err != nil {
return PartInfo{}, toObjectErr(pErr, bucket, object)
}
}

View File

@ -36,6 +36,25 @@ import (
// list all errors which can be ignored in object operations.
var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
// prepareFile hints the bottom layer to optimize the creation of a new object
func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks int) error {
pErrs := make([]error, len(onlineDisks))
// Calculate the real size of the part in one disk.
actualSize := xl.sizeOnDisk(size, blockSize, dataBlocks)
// Prepare object creation in a all disks
for index, disk := range onlineDisks {
if disk != nil {
if err := disk.PrepareFile(bucket, object, actualSize); err != nil {
// Save error to reduce it later
pErrs[index] = err
// Ignore later access to disk which generated the error
onlineDisks[index] = nil
}
}
}
return reduceWriteQuorumErrs(pErrs, objectOpIgnoredErrs, xl.writeQuorum)
}
/// Object Operations
// CopyObject - copy object source object to destination object.
@ -555,12 +574,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Hint the filesystem to pre-allocate one continuous large block.
// This is only an optimization.
if curPartSize > 0 {
// Calculate the real size of the part in the disk.
actualSize := xl.sizeOnDisk(curPartSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
for _, disk := range onlineDisks {
if disk != nil {
disk.PrepareFile(minioMetaTmpBucket, tempErasureObj, actualSize)
}
pErr := xl.prepareFile(minioMetaTmpBucket, tempErasureObj, curPartSize, onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks)
if pErr != nil {
return ObjectInfo{}, toObjectErr(pErr, bucket, object)
}
}