mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Revert heal locks (#12365)
A lot of healing is likely to be on non-existing objects and
locks are very expensive and will slow down scanning
significantly.
In cases where all are valid or, all are broken allow
rejection without locking.
Keep the existing behavior, but move the check for
dangling objects to after the lock has been acquired.
```
_, err = getLatestFileInfo(ctx, partsMetadata, errs)
if err != nil {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
}
```
Revert "heal: Hold lock when reading xl.meta from disks (#12362)"
This reverts commit abd32065aa
This commit is contained in:
parent
4840974d7a
commit
3fff50120b
@ -189,6 +189,35 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
|
||||
return latestFileInfo, nil
|
||||
}
|
||||
|
||||
// fileInfoConsistent whether all fileinfos are consistent with each other.
|
||||
// Will return false if any fileinfo mismatches.
|
||||
func fileInfoConsistent(ctx context.Context, partsMetadata []FileInfo, errs []error) bool {
|
||||
// There should be atleast half correct entries, if not return failure
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, nil, len(partsMetadata)/2); reducedErr != nil {
|
||||
return false
|
||||
}
|
||||
if len(partsMetadata) == 1 {
|
||||
return true
|
||||
}
|
||||
// Reference
|
||||
ref := partsMetadata[0]
|
||||
if !ref.IsValid() {
|
||||
return false
|
||||
}
|
||||
for _, meta := range partsMetadata[1:] {
|
||||
if !meta.IsValid() {
|
||||
return false
|
||||
}
|
||||
if !meta.ModTime.Equal(ref.ModTime) {
|
||||
return false
|
||||
}
|
||||
if meta.DataDir != ref.DataDir {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// disksWithAllParts - This function needs to be called with
|
||||
// []StorageAPI returned by listOnlineDisks. Returns,
|
||||
//
|
||||
|
@ -254,9 +254,23 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
DataBlocks: len(storageDisks) - er.defaultParityCount,
|
||||
}
|
||||
|
||||
// Re-read with data enabled
|
||||
if !opts.NoLock {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
}
|
||||
|
||||
// Re-read when we have lock...
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
|
||||
|
||||
_, err = getLatestFileInfo(ctx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
||||
}
|
||||
// List of disks having latest version of the object er.meta
|
||||
// (by modtime).
|
||||
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
@ -679,6 +693,15 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints
|
||||
})
|
||||
}
|
||||
|
||||
if !lfi.IsValid() {
|
||||
// Default to most common configuration for erasure blocks.
|
||||
result.ParityBlocks = defaultParityCount
|
||||
result.DataBlocks = len(storageDisks) - defaultParityCount
|
||||
} else {
|
||||
result.ParityBlocks = lfi.Erasure.ParityBlocks
|
||||
result.DataBlocks = lfi.Erasure.DataBlocks
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@ -867,34 +890,23 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
||||
} else {
|
||||
newReqInfo = logger.NewReqInfo("", "", globalDeploymentID, "", "Heal", bucket, object)
|
||||
}
|
||||
|
||||
// Inherit context from the global context to avoid cancellation
|
||||
healCtx := logger.SetReqInfo(GlobalContext, newReqInfo)
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
storageEndpoints := er.getEndpoints()
|
||||
|
||||
if !opts.NoLock {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(healCtx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, nil, bucket, object, versionID, er.defaultParityCount), err
|
||||
}
|
||||
healCtx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
}
|
||||
|
||||
// Healing directories handle it separately.
|
||||
if HasSuffix(object, SlashSeparator) {
|
||||
return er.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove)
|
||||
}
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
storageEndpoints := er.getEndpoints()
|
||||
|
||||
// When versionID is empty, we read directly from the `null` versionID for healing.
|
||||
if versionID == "" {
|
||||
versionID = nullVersionID
|
||||
}
|
||||
|
||||
// Read metadata files from all the disks
|
||||
// Perform quick read without lock.
|
||||
// This allows to quickly check if all is ok or all are missing.
|
||||
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
|
||||
if isAllNotFound(errs) {
|
||||
err = toObjectErr(errFileNotFound, bucket, object)
|
||||
@ -905,9 +917,12 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
||||
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
|
||||
}
|
||||
|
||||
_, err = getLatestFileInfo(healCtx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return er.purgeObjectDangling(healCtx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
||||
// Return early if all ok and not deep scanning.
|
||||
if opts.ScanMode == madmin.HealNormalScan && fileInfoConsistent(ctx, partsMetadata, errs) {
|
||||
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||
if err == nil && fi.VersionID == versionID {
|
||||
return defaultHealResult(fi, storageDisks, storageEndpoints, errs, bucket, object, versionID, fi.Erasure.ParityBlocks), nil
|
||||
}
|
||||
}
|
||||
|
||||
// Heal the object.
|
||||
|
Loading…
Reference in New Issue
Block a user