Do not delete parts in multipart if 0 bytes (#6855)

This can create inconsistencies i.e Parts might have
lesser number of parts than ChecksumInfos. This will
result in object to be not readable.

This PR also allows for deleting previously created
corrupted objects.
This commit is contained in:
Harshavardhana 2018-11-26 13:20:21 -08:00 committed by Dee Koder
parent dba61867e8
commit 12a6523fb2
2 changed files with 43 additions and 24 deletions

View File

@ -341,8 +341,10 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpPart, writeQuorum, false)
if data.Size() > 0 || data.Size() == -1 {
if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); pErr != nil {
if data.Size() >= 0 {
if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(),
onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); pErr != nil {
return pi, toObjectErr(pErr, bucket, object)
}
@ -369,6 +371,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
if len(buffer) > int(xlMeta.Erasure.BlockSize) {
buffer = buffer[:xlMeta.Erasure.BlockSize]
}
writers := make([]*bitrotWriter, len(onlineDisks))
for i, disk := range onlineDisks {
if disk == nil {
@ -376,6 +379,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, DefaultBitrotAlgorithm)
}
n, err := erasure.Encode(ctx, data, writers, buffer, erasure.dataBlocks+1)
if err != nil {
return pi, toObjectErr(err, bucket, object)
@ -674,13 +678,6 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
}
// Last part could have been uploaded as 0bytes, do not need
// to save it in final `xl.json`.
if (i == len(parts)-1) && currentXLMeta.Parts[partIdx].Size == 0 {
xlMeta.Parts = xlMeta.Parts[:i] // Skip the part.
continue
}
// Save for total object size.
objectSize += currentXLMeta.Parts[partIdx].Size

View File

@ -450,12 +450,17 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, op
func (xl xlObjects) isObjectCorrupted(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) {
// We can consider an object data not reliable
// when xl.json is not found in read quorum disks.
var notFoundXLJSON int
var notFoundXLJSON, corruptedXLJSON int
for _, readErr := range errs {
if readErr == errFileNotFound {
notFoundXLJSON++
}
}
for _, readErr := range errs {
if readErr == errCorruptedFormat {
corruptedXLJSON++
}
}
for _, m := range metaArr {
if !m.IsValid() {
@ -466,24 +471,35 @@ func (xl xlObjects) isObjectCorrupted(metaArr []xlMetaV1, errs []error) (validMe
}
// Return if the object is indeed corrupted.
return validMeta, len(xl.getDisks())-notFoundXLJSON < validMeta.Erasure.DataBlocks
return validMeta, len(xl.getDisks())-notFoundXLJSON < validMeta.Erasure.DataBlocks || len(xl.getDisks()) == corruptedXLJSON
}
const xlCorruptedSuffix = ".CORRUPTED"
// Renames the corrupted object and makes it visible.
func renameCorruptedObject(ctx context.Context, bucket, object string, validMeta xlMetaV1, disks []StorageAPI, errs []error) {
func (xl xlObjects) renameCorruptedObject(ctx context.Context, bucket, object string, validMeta xlMetaV1, disks []StorageAPI, errs []error) {
// if errs returned are corrupted
if validMeta.Erasure.DataBlocks == 0 {
validMeta = newXLMetaV1(object, len(disks)/2, len(disks)/2)
}
writeQuorum := validMeta.Erasure.DataBlocks + 1
// Move all existing objects into corrupted suffix.
rename(ctx, disks, bucket, object, bucket, object+xlCorruptedSuffix, true, writeQuorum, []error{errFileNotFound})
oldObj := mustGetUUID()
rename(ctx, disks, bucket, object, minioMetaTmpBucket, oldObj, true, writeQuorum, []error{errFileNotFound})
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
defer xl.deleteObject(ctx, minioMetaTmpBucket, oldObj, writeQuorum, false)
tempObj := mustGetUUID()
// Get all the disks which do not have the file.
var cdisks = make([]StorageAPI, len(disks))
for i, merr := range errs {
if merr == errFileNotFound {
if merr == errFileNotFound || merr == errCorruptedFormat {
cdisks[i] = disks[i]
}
}
@ -497,18 +513,24 @@ func renameCorruptedObject(ctx context.Context, bucket, object string, validMeta
disk.AppendFile(minioMetaTmpBucket, pathJoin(tempObj, "part.1"), []byte{})
// Write algorithm hash for empty part file.
alg := validMeta.Erasure.Checksums[0].Algorithm.New()
alg.Write([]byte{})
var algorithm = DefaultBitrotAlgorithm
h := algorithm.New()
h.Write([]byte{})
// Update the checksums and part info.
validMeta.Erasure.Checksums[0] = ChecksumInfo{
Name: validMeta.Erasure.Checksums[0].Name,
Algorithm: validMeta.Erasure.Checksums[0].Algorithm,
Hash: alg.Sum(nil),
validMeta.Erasure.Checksums = []ChecksumInfo{
{
Name: "part.1",
Algorithm: algorithm,
Hash: h.Sum(nil),
},
}
validMeta.Parts[0] = objectPartInfo{
validMeta.Parts = []objectPartInfo{
{
Number: 1,
Name: "part.1",
},
}
// Write the `xl.json` with the newly calculated metadata.
@ -530,7 +552,7 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o
// Having read quorum means we have xl.json in at least N/2 disks.
if !strings.HasSuffix(object, xlCorruptedSuffix) {
if validMeta, ok := xl.isObjectCorrupted(metaArr, errs); ok {
renameCorruptedObject(ctx, bucket, object, validMeta, disks, errs)
xl.renameCorruptedObject(ctx, bucket, object, validMeta, disks, errs)
// Return err file not found since we renamed now the corrupted object
return objInfo, errFileNotFound
}