mirror of
https://github.com/minio/minio.git
synced 2024-12-25 22:55:54 -05:00
39331b6b4e
In a multipart upload scenario disks going down and coming backup can lead to certain parts missing on the disk/server which was going down. This is a valid case since these blocks can be missing and should be healed through heal operation. But we are not supposed to fail prematurely since we have enough data on the other disks as well within read-quorum. This fix relaxes previous assumption, fixes a major corruption issue reproduced by @vadmeste. Fixes #2976
224 lines
7.1 KiB
Go
224 lines
7.1 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import "sync"
|
|
|
|
// Heals a bucket if it doesn't exist on one of the disks.
|
|
func (xl xlObjects) HealBucket(bucket string) error {
|
|
// Verify if bucket is valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return traceError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify if bucket exists.
|
|
if !xl.isBucketExist(bucket) {
|
|
return traceError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
|
|
// Heal bucket - create buckets on disks where it does not exist.
|
|
|
|
// get a random ID for lock instrumentation.
|
|
opsID := getOpsID()
|
|
|
|
nsMutex.Lock(bucket, "", opsID)
|
|
defer nsMutex.Unlock(bucket, "", opsID)
|
|
|
|
// Initialize sync waitgroup.
|
|
var wg = &sync.WaitGroup{}
|
|
|
|
// Initialize list of errors.
|
|
var dErrs = make([]error, len(xl.storageDisks))
|
|
|
|
// Make a volume entry on all underlying storage disks.
|
|
for index, disk := range xl.storageDisks {
|
|
if disk == nil {
|
|
dErrs[index] = traceError(errDiskNotFound)
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
// Make a volume inside a go-routine.
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
if _, err := disk.StatVol(bucket); err != nil {
|
|
if err != errVolumeNotFound {
|
|
dErrs[index] = traceError(err)
|
|
return
|
|
}
|
|
if err = disk.MakeVol(bucket); err != nil {
|
|
dErrs[index] = traceError(err)
|
|
}
|
|
}
|
|
}(index, disk)
|
|
}
|
|
|
|
// Wait for all make vol to finish.
|
|
wg.Wait()
|
|
|
|
// Do we have write quorum?.
|
|
if !isDiskQuorum(dErrs, xl.writeQuorum) {
|
|
// Purge successfully created buckets if we don't have writeQuorum.
|
|
xl.undoMakeBucket(bucket)
|
|
return toObjectErr(traceError(errXLWriteQuorum), bucket)
|
|
}
|
|
|
|
// Verify we have any other errors which should be returned as failure.
|
|
if reducedErr := reduceErrs(dErrs, []error{
|
|
errDiskNotFound,
|
|
errFaultyDisk,
|
|
errDiskAccessDenied,
|
|
}); reducedErr != nil {
|
|
return toObjectErr(reducedErr, bucket)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// HealObject heals a given object for all its missing entries.
|
|
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
|
|
// up again, heal on the object should delete it.
|
|
func (xl xlObjects) HealObject(bucket, object string) error {
|
|
// Verify if bucket is valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return traceError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify if object is valid.
|
|
if !IsValidObjectName(object) {
|
|
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
// get a random ID for lock instrumentation.
|
|
opsID := getOpsID()
|
|
|
|
// Lock the object before healing.
|
|
nsMutex.RLock(bucket, object, opsID)
|
|
defer nsMutex.RUnlock(bucket, object, opsID)
|
|
|
|
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
|
if err := reduceErrs(errs, nil); err != nil {
|
|
return toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
if !xlShouldHeal(partsMetadata, errs) {
|
|
// There is nothing to heal.
|
|
return nil
|
|
}
|
|
|
|
// List of disks having latest version of the object.
|
|
latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
|
|
// List of disks having outdated version of the object or missing object.
|
|
outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs)
|
|
// Latest xlMetaV1 for reference.
|
|
latestMeta := pickValidXLMeta(partsMetadata, modTime)
|
|
|
|
for index, disk := range outDatedDisks {
|
|
// Before healing outdated disks, we need to remove xl.json
|
|
// and part files from "bucket/object/" so that
|
|
// rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds.
|
|
if disk == nil {
|
|
// Not an outdated disk.
|
|
continue
|
|
}
|
|
if errs[index] != nil {
|
|
// If there was an error (most likely errFileNotFound)
|
|
continue
|
|
}
|
|
// Outdated object with the same name exists that needs to be deleted.
|
|
outDatedMeta := partsMetadata[index]
|
|
// Delete all the parts.
|
|
for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ {
|
|
err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name))
|
|
if err != nil {
|
|
return traceError(err)
|
|
}
|
|
}
|
|
// Delete xl.json file.
|
|
err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
|
|
if err != nil {
|
|
return traceError(err)
|
|
}
|
|
}
|
|
|
|
// Reorder so that we have data disks first and parity disks next.
|
|
latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks)
|
|
outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks)
|
|
partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata)
|
|
|
|
// We write at temporary location and then rename to fianal location.
|
|
tmpID := getUUID()
|
|
|
|
// Checksum of the part files. checkSumInfos[index] will contain checksums
|
|
// of all the part files in the outDatedDisks[index]
|
|
checkSumInfos := make([][]checkSumInfo, len(outDatedDisks))
|
|
|
|
// Heal each part. erasureHealFile() will write the healed part to
|
|
// .minio/tmp/uuid/ which needs to be renamed later to the final location.
|
|
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
|
|
partName := latestMeta.Parts[partIndex].Name
|
|
partSize := latestMeta.Parts[partIndex].Size
|
|
erasure := latestMeta.Erasure
|
|
sumInfo := latestMeta.Erasure.GetCheckSumInfo(partName)
|
|
// Heal the part file.
|
|
checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
|
|
bucket, pathJoin(object, partName),
|
|
minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName),
|
|
partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for index, sum := range checkSums {
|
|
if outDatedDisks[index] == nil {
|
|
continue
|
|
}
|
|
checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum})
|
|
}
|
|
}
|
|
|
|
// xl.json should be written to all the healed disks.
|
|
for index, disk := range outDatedDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
partsMetadata[index] = latestMeta
|
|
partsMetadata[index].Erasure.Checksum = checkSumInfos[index]
|
|
}
|
|
|
|
// Generate and write `xl.json` generated from other disks.
|
|
err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
|
|
if err != nil {
|
|
return toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Rename from tmp location to the actual location.
|
|
for _, disk := range outDatedDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
// Remove any lingering partial data from current namespace.
|
|
err = disk.DeleteFile(bucket, retainSlash(object))
|
|
if err != nil && err != errFileNotFound {
|
|
return traceError(err)
|
|
}
|
|
// Attempt a rename now from healed data to final location.
|
|
err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
|
|
if err != nil {
|
|
return traceError(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|