From abd32065aaae4080396a1b4b04a110454368b028 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Mon, 24 May 2021 21:39:38 +0100 Subject: [PATCH] heal: Hold lock when reading xl.meta from disks (#12362) Lock is hold in healObject() after reading xl.meta from disks the first time. This commit will held the lock since the beginning of HealObject() Co-authored-by: Anis Elleuch --- cmd/erasure-healing.go | 53 ++++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index c6fde2d4e..3b8216195 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -254,17 +254,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s DataBlocks: len(storageDisks) - er.defaultParityCount, } - if !opts.NoLock { - lk := er.NewNSLock(bucket, object) - lkctx, err := lk.GetLock(ctx, globalOperationTimeout) - if err != nil { - return result, err - } - ctx = lkctx.Context() - defer lk.Unlock(lkctx.Cancel) - } - - // Re-read when we have lock... + // Re-read with data enabled partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true) // List of disks having latest version of the object er.meta @@ -644,6 +634,17 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints } if lfi.IsValid() { result.ObjectSize = lfi.Size + result.ParityBlocks = lfi.Erasure.ParityBlocks + result.DataBlocks = lfi.Erasure.DataBlocks + } else { + // Default to most common configuration for erasure blocks. + result.ParityBlocks = defaultParityCount + result.DataBlocks = len(storageDisks) - defaultParityCount + } + + if errs == nil { + // No disks related errors are provided, quit + return result } for index, disk := range storageDisks { @@ -677,15 +678,6 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints }) } - if !lfi.IsValid() { - // Default to most common configuration for erasure blocks. - result.ParityBlocks = defaultParityCount - result.DataBlocks = len(storageDisks) - defaultParityCount - } else { - result.ParityBlocks = lfi.Erasure.ParityBlocks - result.DataBlocks = lfi.Erasure.DataBlocks - } - return result } @@ -869,23 +861,34 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version } else { newReqInfo = logger.NewReqInfo("", "", globalDeploymentID, "", "Heal", bucket, object) } + + // Inherit context from the global context to avoid cancellation healCtx := logger.SetReqInfo(GlobalContext, newReqInfo) + storageDisks := er.getDisks() + storageEndpoints := er.getEndpoints() + + if !opts.NoLock { + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(healCtx, globalOperationTimeout) + if err != nil { + return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, nil, bucket, object, versionID, er.defaultParityCount), err + } + healCtx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + } + // Healing directories handle it separately. if HasSuffix(object, SlashSeparator) { return er.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove) } - storageDisks := er.getDisks() - storageEndpoints := er.getEndpoints() - - // Read metadata files from all the disks - // When versionID is empty, we read directly from the `null` versionID for healing. if versionID == "" { versionID = nullVersionID } + // Read metadata files from all the disks partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) if isAllNotFound(errs) {