heal: Hold lock when reading xl.meta from disks (#12362)

Lock is hold in healObject() after reading xl.meta from disks the first
time. This commit will held the lock since the beginning of HealObject()

Co-authored-by: Anis Elleuch <anis@min.io>
This commit is contained in:
Anis Elleuch 2021-05-24 21:39:38 +01:00 committed by GitHub
parent 2baabd455b
commit abd32065aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -254,17 +254,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
DataBlocks: len(storageDisks) - er.defaultParityCount,
}
if !opts.NoLock {
lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return result, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
}
// Re-read when we have lock...
// Re-read with data enabled
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
// List of disks having latest version of the object er.meta
@ -644,6 +634,17 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints
}
if lfi.IsValid() {
result.ObjectSize = lfi.Size
result.ParityBlocks = lfi.Erasure.ParityBlocks
result.DataBlocks = lfi.Erasure.DataBlocks
} else {
// Default to most common configuration for erasure blocks.
result.ParityBlocks = defaultParityCount
result.DataBlocks = len(storageDisks) - defaultParityCount
}
if errs == nil {
// No disks related errors are provided, quit
return result
}
for index, disk := range storageDisks {
@ -677,15 +678,6 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints
})
}
if !lfi.IsValid() {
// Default to most common configuration for erasure blocks.
result.ParityBlocks = defaultParityCount
result.DataBlocks = len(storageDisks) - defaultParityCount
} else {
result.ParityBlocks = lfi.Erasure.ParityBlocks
result.DataBlocks = lfi.Erasure.DataBlocks
}
return result
}
@ -869,23 +861,34 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
} else {
newReqInfo = logger.NewReqInfo("", "", globalDeploymentID, "", "Heal", bucket, object)
}
// Inherit context from the global context to avoid cancellation
healCtx := logger.SetReqInfo(GlobalContext, newReqInfo)
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()
if !opts.NoLock {
lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(healCtx, globalOperationTimeout)
if err != nil {
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, nil, bucket, object, versionID, er.defaultParityCount), err
}
healCtx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
}
// Healing directories handle it separately.
if HasSuffix(object, SlashSeparator) {
return er.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove)
}
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()
// Read metadata files from all the disks
// When versionID is empty, we read directly from the `null` versionID for healing.
if versionID == "" {
versionID = nullVersionID
}
// Read metadata files from all the disks
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
if isAllNotFound(errs) {