mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
HealObjects should remove objects without quorum (#7407)
This PR adds a way to list objects without quorum such that they can purged by `mc admin heal --remove`
This commit is contained in:
parent
9629de8230
commit
4a698c731b
11
cmd/posix.go
11
cmd/posix.go
@ -1358,9 +1358,18 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
|
||||
if err == nil && !isDirEmpty(dstFilePath) {
|
||||
return errFileAccessDenied
|
||||
}
|
||||
if !os.IsNotExist(err) {
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
// Empty destination remove it before rename.
|
||||
if isDirEmpty(dstFilePath) {
|
||||
if err = os.Remove(dstFilePath); err != nil {
|
||||
if isSysErrNotEmpty(err) {
|
||||
return errFileAccessDenied
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
|
@ -1326,20 +1326,10 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObj
|
||||
|
||||
endWalkCh := make(chan struct{})
|
||||
isLeaf := func(bucket, entry string) bool {
|
||||
entry = strings.TrimSuffix(entry, slashSeparator)
|
||||
// Verify if we are at the leaf, a leaf is where we
|
||||
// see `xl.json` inside a directory.
|
||||
return s.getHashedSet(entry).isObject(bucket, entry)
|
||||
return hasSuffix(entry, xlMetaJSONFile)
|
||||
}
|
||||
|
||||
isLeafDir := func(bucket, entry string) bool {
|
||||
var ok bool
|
||||
for _, set := range s.sets {
|
||||
ok = set.isObjectDir(bucket, entry)
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -1354,7 +1344,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObj
|
||||
if walkResult.err != nil {
|
||||
return toObjectErr(walkResult.err, bucket, prefix)
|
||||
}
|
||||
if err := healObjectFn(bucket, walkResult.entry); err != nil {
|
||||
if err := healObjectFn(bucket, strings.TrimSuffix(walkResult.entry, slashSeparator+xlMetaJSONFile)); err != nil {
|
||||
return toObjectErr(err, bucket, walkResult.entry)
|
||||
}
|
||||
if walkResult.end {
|
||||
|
@ -165,7 +165,7 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
||||
|
||||
for i, onlineDisk := range onlineDisks {
|
||||
if onlineDisk == nil {
|
||||
dataErrs[i] = errDiskNotFound
|
||||
dataErrs[i] = errs[i]
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -207,26 +207,13 @@ func shouldHealObjectOnDisk(xlErr, dataErr error, meta xlMetaV1, quorumModTime t
|
||||
}
|
||||
|
||||
// Heals an object by re-writing corrupt/missing erasure blocks.
|
||||
func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, object string,
|
||||
quorum int, dryRun bool, scanMode madmin.HealScanMode) (result madmin.HealResultItem, err error) {
|
||||
func (xl xlObjects) healObject(ctx context.Context, bucket string, object string,
|
||||
partsMetadata []xlMetaV1, errs []error, latestXLMeta xlMetaV1,
|
||||
dryRun bool, remove bool, scanMode madmin.HealScanMode) (result madmin.HealResultItem, err error) {
|
||||
|
||||
partsMetadata, errs := readAllXLMetadata(ctx, storageDisks, bucket, object)
|
||||
dataBlocks := latestXLMeta.Erasure.DataBlocks
|
||||
|
||||
errCount := 0
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
errCount++
|
||||
}
|
||||
}
|
||||
|
||||
if errCount == len(errs) {
|
||||
// Only if we get errors from all the disks we return error. Else we need to
|
||||
// continue to return filled madmin.HealResultItem struct which includes info
|
||||
// on what disks the file is available etc.
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, nil, quorum); reducedErr != nil {
|
||||
return defaultHealResult(storageDisks, errs, bucket, object), toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
}
|
||||
storageDisks := xl.getDisks()
|
||||
|
||||
// List of disks having latest version of the object xl.json
|
||||
// (by modtime).
|
||||
@ -237,10 +224,12 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
|
||||
|
||||
// Initialize heal result object
|
||||
result = madmin.HealResultItem{
|
||||
Type: madmin.HealItemObject,
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
DiskCount: len(storageDisks),
|
||||
Type: madmin.HealItemObject,
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
DiskCount: len(storageDisks),
|
||||
ParityBlocks: latestXLMeta.Erasure.ParityBlocks,
|
||||
DataBlocks: latestXLMeta.Erasure.DataBlocks,
|
||||
|
||||
// Initialize object size to -1, so we can detect if we are
|
||||
// unable to reliably find the object size.
|
||||
@ -308,11 +297,18 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
|
||||
|
||||
// If less than read quorum number of disks have all the parts
|
||||
// of the data, we can't reconstruct the erasure-coded data.
|
||||
if numAvailableDisks < quorum {
|
||||
// Default to most common configuration for erasure
|
||||
// blocks upon returning quorum error.
|
||||
result.ParityBlocks = len(storageDisks) / 2
|
||||
result.DataBlocks = len(storageDisks) / 2
|
||||
if numAvailableDisks < dataBlocks {
|
||||
// Check if xl.json, and corresponding parts are also missing.
|
||||
if m, ok := isObjectDangling(partsMetadata, errs, dataErrs); ok {
|
||||
writeQuorum := m.Erasure.DataBlocks + 1
|
||||
if m.Erasure.DataBlocks == 0 {
|
||||
writeQuorum = len(storageDisks)/2 + 1
|
||||
}
|
||||
if !dryRun && remove {
|
||||
err = xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
||||
}
|
||||
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), err
|
||||
}
|
||||
return result, toObjectErr(errXLReadQuorum, bucket, object)
|
||||
}
|
||||
|
||||
@ -329,7 +325,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
|
||||
|
||||
// Latest xlMetaV1 for reference. If a valid metadata is not
|
||||
// present, it is as good as object not found.
|
||||
latestMeta, pErr := pickValidXLMeta(ctx, partsMetadata, modTime, quorum)
|
||||
latestMeta, pErr := pickValidXLMeta(ctx, partsMetadata, modTime, dataBlocks)
|
||||
if pErr != nil {
|
||||
return result, toObjectErr(pErr, bucket, object)
|
||||
}
|
||||
@ -534,7 +530,7 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
|
||||
|
||||
// Populates default heal result item entries with possible values when we are returning prematurely.
|
||||
// This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
|
||||
func defaultHealResult(storageDisks []StorageAPI, errs []error, bucket, object string) madmin.HealResultItem {
|
||||
func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []error, bucket, object string) madmin.HealResultItem {
|
||||
// Initialize heal result object
|
||||
result := madmin.HealResultItem{
|
||||
Type: madmin.HealItemObject,
|
||||
@ -546,6 +542,9 @@ func defaultHealResult(storageDisks []StorageAPI, errs []error, bucket, object s
|
||||
// unable to reliably find the object size.
|
||||
ObjectSize: -1,
|
||||
}
|
||||
if latestXLMeta.IsValid() {
|
||||
result.ObjectSize = latestXLMeta.Stat.Size
|
||||
}
|
||||
|
||||
for index, disk := range storageDisks {
|
||||
if disk == nil {
|
||||
@ -577,9 +576,14 @@ func defaultHealResult(storageDisks []StorageAPI, errs []error, bucket, object s
|
||||
})
|
||||
}
|
||||
|
||||
// Default to most common configuration for erasure blocks.
|
||||
result.ParityBlocks = len(storageDisks) / 2
|
||||
result.DataBlocks = len(storageDisks) / 2
|
||||
if !latestXLMeta.IsValid() {
|
||||
// Default to most common configuration for erasure blocks.
|
||||
result.ParityBlocks = len(storageDisks) / 2
|
||||
result.DataBlocks = len(storageDisks) / 2
|
||||
} else {
|
||||
result.ParityBlocks = latestXLMeta.Erasure.ParityBlocks
|
||||
result.DataBlocks = latestXLMeta.Erasure.DataBlocks
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
@ -587,7 +591,7 @@ func defaultHealResult(storageDisks []StorageAPI, errs []error, bucket, object s
|
||||
// Object is considered dangling/corrupted if any only
|
||||
// if total disks - a combination of corrupted and missing
|
||||
// files is lesser than number of data blocks.
|
||||
func (xl xlObjects) isObjectDangling(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) {
|
||||
func isObjectDangling(metaArr []xlMetaV1, errs []error, dataErrs []error) (validMeta xlMetaV1, ok bool) {
|
||||
// We can consider an object data not reliable
|
||||
// when xl.json is not found in read quorum disks.
|
||||
// or when xl.json is not readable in read quorum disks.
|
||||
@ -599,6 +603,18 @@ func (xl xlObjects) isObjectDangling(metaArr []xlMetaV1, errs []error) (validMet
|
||||
corruptedXLJSON++
|
||||
}
|
||||
}
|
||||
var notFoundParts int
|
||||
for i := range dataErrs {
|
||||
// Only count part errors, if the error is not
|
||||
// same as xl.json error. This is to avoid
|
||||
// double counting when both parts and xl.json
|
||||
// are not available.
|
||||
if errs[i] != dataErrs[i] {
|
||||
if dataErrs[i] == errFileNotFound {
|
||||
notFoundParts++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range metaArr {
|
||||
if !m.IsValid() {
|
||||
@ -613,15 +629,11 @@ func (xl xlObjects) isObjectDangling(metaArr []xlMetaV1, errs []error) (validMet
|
||||
return validMeta, true
|
||||
}
|
||||
|
||||
// We have valid meta, now verify if we have enough files with data blocks.
|
||||
return validMeta, (len(xl.getDisks()) - corruptedXLJSON - notFoundXLJSON) < validMeta.Erasure.DataBlocks
|
||||
// We have valid meta, now verify if we have enough files with parity blocks.
|
||||
return validMeta, corruptedXLJSON+notFoundXLJSON+notFoundParts > validMeta.Erasure.ParityBlocks
|
||||
}
|
||||
|
||||
// HealObject - heal the given object.
|
||||
//
|
||||
// FIXME: If an object object was deleted and one disk was down,
|
||||
// and later the disk comes back up again, heal on the object
|
||||
// should delete it.
|
||||
// HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.
|
||||
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool, remove bool, scanMode madmin.HealScanMode) (hr madmin.HealResultItem, err error) {
|
||||
// Create context that also contains information about the object and bucket.
|
||||
// The top level handler might not have this information.
|
||||
@ -641,13 +653,12 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
|
||||
|
||||
storageDisks := xl.getDisks()
|
||||
|
||||
// FIXME: Metadata is read again in the healObject() call below.
|
||||
// Read metadata files from all the disks
|
||||
partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object)
|
||||
|
||||
// Check if the object is dangling, if yes and user requested
|
||||
// remove we simply delete it from namespace.
|
||||
if m, ok := xl.isObjectDangling(partsMetadata, errs); ok {
|
||||
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
||||
writeQuorum := m.Erasure.DataBlocks + 1
|
||||
if m.Erasure.DataBlocks == 0 {
|
||||
writeQuorum = len(xl.getDisks())/2 + 1
|
||||
@ -655,21 +666,46 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
|
||||
if !dryRun && remove {
|
||||
err = xl.deleteObject(healCtx, bucket, object, writeQuorum, false)
|
||||
}
|
||||
return defaultHealResult(storageDisks, errs, bucket, object), err
|
||||
return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), err
|
||||
}
|
||||
|
||||
latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return defaultHealResult(storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
|
||||
return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Lock the object before healing.
|
||||
objectLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
if lerr := objectLock.GetRLock(globalHealingTimeout); lerr != nil {
|
||||
return defaultHealResult(storageDisks, errs, bucket, object), lerr
|
||||
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), lerr
|
||||
}
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
errCount := 0
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
errCount++
|
||||
}
|
||||
}
|
||||
|
||||
if errCount == len(errs) {
|
||||
// Only if we get errors from all the disks we return error. Else we need to
|
||||
// continue to return filled madmin.HealResultItem struct which includes info
|
||||
// on what disks the file is available etc.
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, nil, latestXLMeta.Erasure.DataBlocks); reducedErr != nil {
|
||||
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
||||
writeQuorum := m.Erasure.DataBlocks + 1
|
||||
if m.Erasure.DataBlocks == 0 {
|
||||
writeQuorum = len(storageDisks)/2 + 1
|
||||
}
|
||||
if !dryRun && remove {
|
||||
err = xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
||||
}
|
||||
}
|
||||
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
// Heal the object.
|
||||
return healObject(healCtx, xl.getDisks(), bucket, object, latestXLMeta.Erasure.DataBlocks, dryRun, scanMode)
|
||||
return xl.healObject(healCtx, bucket, object, partsMetadata, errs, latestXLMeta, dryRun, remove, scanMode)
|
||||
}
|
||||
|
@ -58,6 +58,91 @@ func TestUndoMakeBucket(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealObjectCorrupted(t *testing.T) {
|
||||
nDisks := 16
|
||||
fsDirs, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
// Everything is fine, should return nil
|
||||
obj, _, err := initObjectLayer(mustGetNewEndpointList(fsDirs...))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bucket := "bucket"
|
||||
object := "object"
|
||||
data := bytes.Repeat([]byte("a"), 5*1024*1024)
|
||||
var opts ObjectOptions
|
||||
|
||||
err = obj.MakeBucketWithLocation(context.Background(), bucket, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a bucket - %v", err)
|
||||
}
|
||||
|
||||
// Create an object with multiple parts uploaded in decreasing
|
||||
// part number.
|
||||
uploadID, err := obj.NewMultipartUpload(context.Background(), bucket, object, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create a multipart upload - %v", err)
|
||||
}
|
||||
|
||||
var uploadedParts []CompletePart
|
||||
for _, partID := range []int{2, 1} {
|
||||
pInfo, err1 := obj.PutObjectPart(context.Background(), bucket, object, uploadID, partID, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
|
||||
if err1 != nil {
|
||||
t.Fatalf("Failed to upload a part - %v", err1)
|
||||
}
|
||||
uploadedParts = append(uploadedParts, CompletePart{
|
||||
PartNumber: pInfo.PartNumber,
|
||||
ETag: pInfo.ETag,
|
||||
})
|
||||
}
|
||||
|
||||
_, err = obj.CompleteMultipartUpload(context.Background(), bucket, object, uploadID, uploadedParts, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to complete multipart upload - %v", err)
|
||||
}
|
||||
|
||||
// Remove the object backend files from the first disk.
|
||||
xl := obj.(*xlObjects)
|
||||
firstDisk := xl.storageDisks[0]
|
||||
err = firstDisk.DeleteFile(bucket, filepath.Join(object, xlMetaJSONFile))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete a file - %v", err)
|
||||
}
|
||||
|
||||
_, err = obj.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to heal object - %v", err)
|
||||
}
|
||||
|
||||
_, err = firstDisk.StatFile(bucket, filepath.Join(object, xlMetaJSONFile))
|
||||
if err != nil {
|
||||
t.Errorf("Expected xl.json file to be present but stat failed - %v", err)
|
||||
}
|
||||
|
||||
// Delete xl.json from more than read quorum number of disks, to create a corrupted situation.
|
||||
for i := 0; i <= len(xl.storageDisks)/2; i++ {
|
||||
xl.storageDisks[i].DeleteFile(bucket, filepath.Join(object, xlMetaJSONFile))
|
||||
}
|
||||
|
||||
// Try healing now, expect to receive errDiskNotFound.
|
||||
_, err = obj.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
|
||||
if err != nil {
|
||||
t.Errorf("Expected nil but received %v", err)
|
||||
}
|
||||
|
||||
// since majority of xl.jsons are not available, object should be successfully deleted.
|
||||
_, err = obj.GetObjectInfo(context.Background(), bucket, object, ObjectOptions{})
|
||||
if _, ok := err.(ObjectNotFound); !ok {
|
||||
t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests healing of object.
|
||||
func TestHealObjectXL(t *testing.T) {
|
||||
nDisks := 16
|
||||
|
Loading…
Reference in New Issue
Block a user