xl: heal bucket should validate if bucket exists first. (#2953)

Fixes #2944
This commit is contained in:
Harshavardhana 2016-10-17 02:10:23 -07:00 committed by GitHub
parent ea406754a6
commit fee3f99a6e
4 changed files with 322 additions and 297 deletions

135
cmd/xl-v1-healing-common.go Normal file
View File

@ -0,0 +1,135 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "time"
// commonTime returns a maximally occurring time from a list of time.
func commonTime(modTimes []time.Time) (modTime time.Time) {
var maxima int // Counter for remembering max occurrence of elements.
timeOccurenceMap := make(map[time.Time]int)
// Ignore the uuid sentinel and count the rest.
for _, time := range modTimes {
if time == timeSentinel {
continue
}
timeOccurenceMap[time]++
}
// Find the common cardinality from previously collected
// occurrences of elements.
for time, count := range timeOccurenceMap {
if count > maxima {
maxima = count
modTime = time
}
}
// Return the collected common uuid.
return modTime
}
// Beginning of unix time is treated as sentinel value here.
var timeSentinel = time.Unix(0, 0).UTC()
// Boot modTimes up to disk count, setting the value to time sentinel.
func bootModtimes(diskCount int) []time.Time {
modTimes := make([]time.Time, diskCount)
// Boots up all the modtimes.
for i := range modTimes {
modTimes[i] = timeSentinel
}
return modTimes
}
// Extracts list of times from xlMetaV1 slice and returns, skips
// slice elements which have errors. As a special error
// errFileNotFound is treated as a initial good condition.
func listObjectModtimes(partsMetadata []xlMetaV1, errs []error) (modTimes []time.Time) {
modTimes = bootModtimes(len(partsMetadata))
// Set a new time value, specifically set when
// error == errFileNotFound (this is needed when this is a
// fresh PutObject).
timeNow := time.Now().UTC()
for index, metadata := range partsMetadata {
if errs[index] == nil {
// Once the file is found, save the uuid saved on disk.
modTimes[index] = metadata.Stat.ModTime
} else if errs[index] == errFileNotFound {
// Once the file is not found then the epoch is current time.
modTimes[index] = timeNow
}
}
return modTimes
}
// Returns slice of online disks needed.
// - slice returing readable disks.
// - modTime of the Object
func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) {
onlineDisks = make([]StorageAPI, len(disks))
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)
// Reduce list of UUIDs to a single common value.
modTime = commonTime(modTimes)
// Create a new online disks slice, which have common uuid.
for index, t := range modTimes {
if t == modTime {
onlineDisks[index] = disks[index]
} else {
onlineDisks[index] = nil
}
}
return onlineDisks, modTime
}
// Return disks with the outdated or missing object.
func outDatedDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (outDatedDisks []StorageAPI) {
outDatedDisks = make([]StorageAPI, len(disks))
latestDisks, _ := listOnlineDisks(disks, partsMetadata, errs)
for index, disk := range latestDisks {
if errorCause(errs[index]) == errFileNotFound {
outDatedDisks[index] = disks[index]
continue
}
if errs[index] != nil {
continue
}
if disk == nil {
outDatedDisks[index] = disks[index]
}
}
return outDatedDisks
}
// Returns if the object should be healed.
func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool {
modTime := commonTime(listObjectModtimes(partsMetadata, errs))
for index := range partsMetadata {
if errs[index] == errDiskNotFound {
continue
}
if errs[index] != nil {
return true
}
if modTime != partsMetadata[index].Stat.ModTime {
return true
}
}
return false
}

View File

@ -16,120 +16,211 @@
package cmd
import "time"
import "sync"
// commonTime returns a maximally occurring time from a list of time.
func commonTime(modTimes []time.Time) (modTime time.Time) {
var maxima int // Counter for remembering max occurrence of elements.
timeOccurenceMap := make(map[time.Time]int)
// Ignore the uuid sentinel and count the rest.
for _, time := range modTimes {
if time == timeSentinel {
continue
}
timeOccurenceMap[time]++
// Heals a bucket if it doesn't exist on one of the disks.
func (xl xlObjects) HealBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Find the common cardinality from previously collected
// occurrences of elements.
for time, count := range timeOccurenceMap {
if count > maxima {
maxima = count
modTime = time
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return traceError(BucketNotFound{Bucket: bucket})
}
// Return the collected common uuid.
return modTime
}
// Beginning of unix time is treated as sentinel value here.
var timeSentinel = time.Unix(0, 0).UTC()
// Heal bucket - create buckets on disks where it does not exist.
// Boot modTimes up to disk count, setting the value to time sentinel.
func bootModtimes(diskCount int) []time.Time {
modTimes := make([]time.Time, diskCount)
// Boots up all the modtimes.
for i := range modTimes {
modTimes[i] = timeSentinel
}
return modTimes
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Extracts list of times from xlMetaV1 slice and returns, skips
// slice elements which have errors. As a special error
// errFileNotFound is treated as a initial good condition.
func listObjectModtimes(partsMetadata []xlMetaV1, errs []error) (modTimes []time.Time) {
modTimes = bootModtimes(len(partsMetadata))
// Set a new time value, specifically set when
// error == errFileNotFound (this is needed when this is a
// fresh PutObject).
timeNow := time.Now().UTC()
for index, metadata := range partsMetadata {
if errs[index] == nil {
// Once the file is found, save the uuid saved on disk.
modTimes[index] = metadata.Stat.ModTime
} else if errs[index] == errFileNotFound {
// Once the file is not found then the epoch is current time.
modTimes[index] = timeNow
}
}
return modTimes
}
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
// Returns slice of online disks needed.
// - slice returing readable disks.
// - modTime of the Object
func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) {
onlineDisks = make([]StorageAPI, len(disks))
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)
// Initialize list of errors.
var dErrs = make([]error, len(xl.storageDisks))
// Reduce list of UUIDs to a single common value.
modTime = commonTime(modTimes)
// Create a new online disks slice, which have common uuid.
for index, t := range modTimes {
if t == modTime {
onlineDisks[index] = disks[index]
} else {
onlineDisks[index] = nil
}
}
return onlineDisks, modTime
}
// Return disks with the outdated or missing object.
func outDatedDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (outDatedDisks []StorageAPI) {
outDatedDisks = make([]StorageAPI, len(disks))
latestDisks, _ := listOnlineDisks(disks, partsMetadata, errs)
for index, disk := range latestDisks {
if errorCause(errs[index]) == errFileNotFound {
outDatedDisks[index] = disks[index]
continue
}
if errs[index] != nil {
continue
}
// Make a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if disk == nil {
outDatedDisks[index] = disks[index]
dErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
if _, err := disk.StatVol(bucket); err != nil {
if err != errVolumeNotFound {
dErrs[index] = traceError(err)
return
}
if err = disk.MakeVol(bucket); err != nil {
dErrs[index] = traceError(err)
}
}
}(index, disk)
}
return outDatedDisks
// Wait for all make vol to finish.
wg.Wait()
// Do we have write quorum?.
if !isDiskQuorum(dErrs, xl.writeQuorum) {
// Purge successfully created buckets if we don't have writeQuorum.
xl.undoMakeBucket(bucket)
return toObjectErr(traceError(errXLWriteQuorum), bucket)
}
// Verify we have any other errors which should be returned as failure.
if reducedErr := reduceErrs(dErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}); reducedErr != nil {
return toObjectErr(reducedErr, bucket)
}
return nil
}
// Returns if the object should be healed.
func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool {
modTime := commonTime(listObjectModtimes(partsMetadata, errs))
for index := range partsMetadata {
if errs[index] == errDiskNotFound {
// HealObject heals a given object for all its missing entries.
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
// up again, heal on the object should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before healing.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
if err := reduceErrs(errs, nil); err != nil {
return toObjectErr(err, bucket, object)
}
if !xlShouldHeal(partsMetadata, errs) {
// There is nothing to heal.
return nil
}
// List of disks having latest version of the object.
latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
// List of disks having outdated version of the object or missing object.
outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs)
// Latest xlMetaV1 for reference.
latestMeta := pickValidXLMeta(partsMetadata, modTime)
for index, disk := range outDatedDisks {
// Before healing outdated disks, we need to remove xl.json
// and part files from "bucket/object/" so that
// rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds.
if disk == nil {
// Not an outdated disk.
continue
}
if errs[index] != nil {
return true
// If there was an error (most likely errFileNotFound)
continue
}
if modTime != partsMetadata[index].Stat.ModTime {
return true
// Outdated object with the same name exists that needs to be deleted.
outDatedMeta := partsMetadata[index]
// Delete all the parts.
for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ {
err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name))
if err != nil {
return traceError(err)
}
}
// Delete xl.json file.
err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
if err != nil {
return traceError(err)
}
}
return false
// Reorder so that we have data disks first and parity disks next.
latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks)
outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks)
partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata)
// We write at temporary location and then rename to fianal location.
tmpID := getUUID()
// Checksum of the part files. checkSumInfos[index] will contain checksums
// of all the part files in the outDatedDisks[index]
checkSumInfos := make([][]checkSumInfo, len(outDatedDisks))
// Heal each part. erasureHealFile() will write the healed part to
// .minio/tmp/uuid/ which needs to be renamed later to the final location.
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
partName := latestMeta.Parts[partIndex].Name
partSize := latestMeta.Parts[partIndex].Size
erasure := latestMeta.Erasure
sumInfo, err := latestMeta.Erasure.GetCheckSumInfo(partName)
if err != nil {
return err
}
// Heal the part file.
checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
bucket, pathJoin(object, partName),
minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName),
partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
if err != nil {
return err
}
for index, sum := range checkSums {
if outDatedDisks[index] == nil {
continue
}
checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum})
}
}
// xl.json should be written to all the healed disks.
for index, disk := range outDatedDisks {
if disk == nil {
continue
}
partsMetadata[index] = latestMeta
partsMetadata[index].Erasure.Checksum = checkSumInfos[index]
}
// Generate and write `xl.json` generated from other disks.
err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
if err != nil {
return toObjectErr(err, bucket, object)
}
// Rename from tmp location to the actual location.
for _, disk := range outDatedDisks {
if disk == nil {
continue
}
// Remove any lingering partial data from current namespace.
err = disk.DeleteFile(bucket, retainSlash(object))
if err != nil && err != errFileNotFound {
return traceError(err)
}
// Attempt a rename now from healed data to final location.
err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
if err != nil {
return traceError(err)
}
}
return nil
}

View File

@ -218,207 +218,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
return nil
}
func (xl xlObjects) HealBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Heal bucket - create buckets on disks where it does not exist.
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Initialize list of errors.
var dErrs = make([]error, len(xl.storageDisks))
// Make a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if disk == nil {
dErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
if _, err := disk.StatVol(bucket); err != nil {
if err != errVolumeNotFound {
dErrs[index] = traceError(err)
return
}
if err = disk.MakeVol(bucket); err != nil {
dErrs[index] = traceError(err)
}
}
}(index, disk)
}
// Wait for all make vol to finish.
wg.Wait()
// Do we have write quorum?.
if !isDiskQuorum(dErrs, xl.writeQuorum) {
// Purge successfully created buckets if we don't have writeQuorum.
xl.undoMakeBucket(bucket)
return toObjectErr(traceError(errXLWriteQuorum), bucket)
}
// Verify we have any other errors which should be returned as failure.
if reducedErr := reduceErrs(dErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}); reducedErr != nil {
return toObjectErr(reducedErr, bucket)
}
return nil
}
// HealObject heals a given object for all its missing entries.
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
// up again, heal on the object should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before healing.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
if err := reduceErrs(errs, nil); err != nil {
return toObjectErr(err, bucket, object)
}
if !xlShouldHeal(partsMetadata, errs) {
// There is nothing to heal.
return nil
}
// List of disks having latest version of the object.
latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
// List of disks having outdated version of the object or missing object.
outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs)
// Latest xlMetaV1 for reference.
latestMeta := pickValidXLMeta(partsMetadata, modTime)
for index, disk := range outDatedDisks {
// Before healing outdated disks, we need to remove xl.json
// and part files from "bucket/object/" so that
// rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds.
if disk == nil {
// Not an outdated disk.
continue
}
if errs[index] != nil {
// If there was an error (most likely errFileNotFound)
continue
}
// Outdated object with the same name exists that needs to be deleted.
outDatedMeta := partsMetadata[index]
// Delete all the parts.
for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ {
err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name))
if err != nil {
return traceError(err)
}
}
// Delete xl.json file.
err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
if err != nil {
return traceError(err)
}
}
// Reorder so that we have data disks first and parity disks next.
latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks)
outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks)
partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata)
// We write at temporary location and then rename to fianal location.
tmpID := getUUID()
// Checksum of the part files. checkSumInfos[index] will contain checksums
// of all the part files in the outDatedDisks[index]
checkSumInfos := make([][]checkSumInfo, len(outDatedDisks))
// Heal each part. erasureHealFile() will write the healed part to
// .minio/tmp/uuid/ which needs to be renamed later to the final location.
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
partName := latestMeta.Parts[partIndex].Name
partSize := latestMeta.Parts[partIndex].Size
erasure := latestMeta.Erasure
sumInfo, err := latestMeta.Erasure.GetCheckSumInfo(partName)
if err != nil {
return err
}
// Heal the part file.
checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
bucket, pathJoin(object, partName),
minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName),
partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
if err != nil {
return err
}
for index, sum := range checkSums {
if outDatedDisks[index] == nil {
continue
}
checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum})
}
}
// xl.json should be written to all the healed disks.
for index, disk := range outDatedDisks {
if disk == nil {
continue
}
partsMetadata[index] = latestMeta
partsMetadata[index].Erasure.Checksum = checkSumInfos[index]
}
// Generate and write `xl.json` generated from other disks.
err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
if err != nil {
return toObjectErr(err, bucket, object)
}
// Rename from tmp location to the actual location.
for _, disk := range outDatedDisks {
if disk == nil {
continue
}
// Remove any lingering partial data from current namespace.
err = disk.DeleteFile(bucket, retainSlash(object))
if err != nil && err != errFileNotFound {
return traceError(err)
}
// Attempt a rename now from healed data to final location.
err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
if err != nil {
return traceError(err)
}
}
return nil
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.