mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
skip disks under scanning when healing disks (#17822)
Bonus: - avoid calling DiskInfo() calls when missing blocks instead heal the object using MRF operation. - change the max_sleep to 250ms beyond that we will not stop healing.
This commit is contained in:
parent
6e860b6dc5
commit
c45bc32d98
@ -75,9 +75,9 @@ func waitForLowIO(maxIO int, maxWait time.Duration, currentIO func() int) {
|
||||
if tmpMaxWait > 0 {
|
||||
if tmpMaxWait < waitTick {
|
||||
time.Sleep(tmpMaxWait)
|
||||
} else {
|
||||
time.Sleep(waitTick)
|
||||
return
|
||||
}
|
||||
time.Sleep(waitTick)
|
||||
tmpMaxWait -= waitTick
|
||||
}
|
||||
if tmpMaxWait <= 0 {
|
||||
|
@ -412,9 +412,14 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
||||
switch scan {
|
||||
case madmin.HealNormalScan, madmin.HealDeepScan:
|
||||
healOnce.Do(func() {
|
||||
if _, healing := er.getOnlineDisksWithHealing(); !healing {
|
||||
go healObject(bucket, object, fi.VersionID, scan)
|
||||
}
|
||||
globalMRFState.addPartialOp(partialOperation{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
versionID: fi.VersionID,
|
||||
queued: time.Now(),
|
||||
setIndex: er.setIndex,
|
||||
poolIndex: er.poolIndex,
|
||||
})
|
||||
})
|
||||
// Healing is triggered and we have written
|
||||
// successfully the content to client for
|
||||
@ -732,9 +737,14 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
||||
// additionally do not heal delete markers inline, let them be
|
||||
// healed upon regular heal process.
|
||||
if !fi.Deleted && missingBlocks > 0 && missingBlocks < readQuorum {
|
||||
if _, healing := er.getOnlineDisksWithHealing(); !healing {
|
||||
go healObject(bucket, object, fi.VersionID, madmin.HealNormalScan)
|
||||
}
|
||||
globalMRFState.addPartialOp(partialOperation{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
versionID: fi.VersionID,
|
||||
queued: time.Now(),
|
||||
setIndex: er.setIndex,
|
||||
poolIndex: er.poolIndex,
|
||||
})
|
||||
}
|
||||
|
||||
return fi, metaArr, onlineDisks, nil
|
||||
|
@ -309,6 +309,7 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var scanningDisks []StorageAPI
|
||||
for i, info := range infos {
|
||||
// Check if one of the drives in the set is being healed.
|
||||
// this information is used by scanner to skip healing
|
||||
@ -317,9 +318,16 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
|
||||
healing = true
|
||||
continue
|
||||
}
|
||||
newDisks = append(newDisks, disks[i])
|
||||
if !info.Scanning {
|
||||
newDisks = append(newDisks, disks[i])
|
||||
} else {
|
||||
scanningDisks = append(scanningDisks, disks[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Prefer new disks over disks which are currently being scanned.
|
||||
newDisks = append(newDisks, scanningDisks...)
|
||||
|
||||
return newDisks, healing
|
||||
}
|
||||
|
||||
|
@ -192,7 +192,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
tracker.setObject("")
|
||||
tracker.setBucket(bucket)
|
||||
// Heal current bucket again in case if it is failed
|
||||
// in the being of erasure set healing
|
||||
// in the beginning of erasure set healing
|
||||
if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{
|
||||
ScanMode: scanMode,
|
||||
}); err != nil {
|
||||
@ -241,7 +241,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
|
||||
// Collect updates to tracker from concurrent healEntry calls
|
||||
results := make(chan healEntryResult)
|
||||
results := make(chan healEntryResult, 1000)
|
||||
go func() {
|
||||
for res := range results {
|
||||
if res.entryDone {
|
||||
@ -256,6 +256,15 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
}()
|
||||
|
||||
send := func(result healEntryResult) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case results <- result:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Note: updates from healEntry to tracker must be sent on results channel.
|
||||
healEntry := func(bucket string, entry metaCacheEntry) {
|
||||
if entry.name == "" && len(entry.metadata) == 0 {
|
||||
@ -302,12 +311,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
result = healEntrySuccess(0)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case results <- result:
|
||||
}
|
||||
|
||||
send(result)
|
||||
return
|
||||
}
|
||||
|
||||
@ -342,10 +346,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
bgSeq.logHeal(madmin.HealItemObject)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if !send(result) {
|
||||
return
|
||||
case results <- result:
|
||||
}
|
||||
}
|
||||
// All versions resulted in 'ObjectNotFound'
|
||||
|
@ -93,7 +93,7 @@ type xlStorageDiskIDCheck struct {
|
||||
|
||||
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||
p.metricsCache.Once.Do(func() {
|
||||
p.metricsCache.TTL = 100 * time.Millisecond
|
||||
p.metricsCache.TTL = 1 * time.Second
|
||||
p.metricsCache.Update = func() (interface{}, error) {
|
||||
diskMetric := DiskMetrics{
|
||||
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
|
||||
|
@ -97,7 +97,7 @@ var DefaultKVS = config.KVS{
|
||||
},
|
||||
config.KV{
|
||||
Key: Sleep,
|
||||
Value: "1s",
|
||||
Value: "250ms",
|
||||
},
|
||||
config.KV{
|
||||
Key: IOCount,
|
||||
|
Loading…
Reference in New Issue
Block a user