mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Use the same lock for the scanner and site replication healing (#15985)
This commit is contained in:
parent
52769e1e71
commit
3b1a9b9fdf
@ -64,11 +64,6 @@ const (
|
||||
var (
|
||||
globalHealConfig heal.Config
|
||||
|
||||
dataScannerLeaderLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{
|
||||
timeout: 30 * time.Second,
|
||||
minimum: 10 * time.Second,
|
||||
retryInterval: time.Second,
|
||||
})
|
||||
// Sleeper values are updated when config is loaded.
|
||||
scannerSleeper = newDynamicSleeper(10, 10*time.Second, true)
|
||||
scannerCycle = uatomic.NewDuration(dataScannerStartDelay)
|
||||
@ -157,19 +152,9 @@ func saveBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer, info backgr
|
||||
// runDataScanner will start a data scanner.
|
||||
// The function will block until the context is canceled.
|
||||
// There should only ever be one scanner running per cluster.
|
||||
func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
|
||||
// Make sure only 1 scanner is running on the cluster.
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, "scanner/runDataScanner.lock")
|
||||
lkctx, err := locker.GetLock(pctx, dataScannerLeaderLockTimeout)
|
||||
if err != nil {
|
||||
if intDataUpdateTracker.debug {
|
||||
logger.LogIf(pctx, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
ctx := lkctx.Context()
|
||||
defer lkctx.Cancel()
|
||||
// No unlock for "leader" lock.
|
||||
func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
||||
ctx, cancel := globalLeaderLock.GetLock(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Load current bloom cycle
|
||||
var cycleInfo currentScannerCycle
|
||||
@ -181,7 +166,7 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
|
||||
} else if len(buf) > 8 {
|
||||
cycleInfo.next = binary.LittleEndian.Uint64(buf[:8])
|
||||
buf = buf[8:]
|
||||
_, err = cycleInfo.UnmarshalMsg(buf)
|
||||
_, err := cycleInfo.UnmarshalMsg(buf)
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
||||
|
@ -291,6 +291,9 @@ var (
|
||||
// GlobalKMS initialized KMS configuration
|
||||
GlobalKMS kms.KMS
|
||||
|
||||
// Common lock for various subsystems performing the leader tasks
|
||||
globalLeaderLock *sharedLock
|
||||
|
||||
// Auto-Encryption, if enabled, turns any non-SSE-C request
|
||||
// into an SSE-S3 request. If enabled a valid, non-empty KMS
|
||||
// configuration must be present.
|
||||
|
@ -581,6 +581,8 @@ func serverMain(ctx *cli.Context) {
|
||||
xhttp.SetDeploymentID(globalDeploymentID)
|
||||
xhttp.SetMinIOVersion(Version)
|
||||
|
||||
globalLeaderLock = newSharedLock(GlobalContext, newObject, "leader.lock")
|
||||
|
||||
// Enable background operations for erasure coding
|
||||
initAutoHeal(GlobalContext, newObject)
|
||||
initHealMRF(GlobalContext, newObject)
|
||||
|
86
cmd/shared-lock.go
Normal file
86
cmd/shared-lock.go
Normal file
@ -0,0 +1,86 @@
|
||||
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
var sharedLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{
|
||||
timeout: 30 * time.Second,
|
||||
minimum: 10 * time.Second,
|
||||
retryInterval: time.Minute,
|
||||
})
|
||||
|
||||
type sharedLock struct {
|
||||
lockContext chan LockContext
|
||||
}
|
||||
|
||||
func (ld sharedLock) backgroundRoutine(ctx context.Context, objAPI ObjectLayer, lockName string) {
|
||||
for {
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, lockName)
|
||||
lkctx, err := locker.GetLock(ctx, sharedLockTimeout)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-lkctx.Context().Done():
|
||||
// The context of the lock is canceled, this can happen
|
||||
// if one lock lost quorum due to cluster instability
|
||||
// in that case, try to lock again.
|
||||
break
|
||||
case ld.lockContext <- lkctx:
|
||||
// Send the lock context to anyone asking for it
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func mergeContext(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx1.Done():
|
||||
case <-ctx2.Done():
|
||||
}
|
||||
|
||||
cancel()
|
||||
}()
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
func (ld sharedLock) GetLock(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||
select {
|
||||
case l := <-ld.lockContext:
|
||||
return mergeContext(l.Context(), ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func newSharedLock(ctx context.Context, objAPI ObjectLayer, lockName string) *sharedLock {
|
||||
l := &sharedLock{
|
||||
lockContext: make(chan LockContext),
|
||||
}
|
||||
go l.backgroundRoutine(ctx, objAPI, lockName)
|
||||
|
||||
return l
|
||||
}
|
@ -3577,12 +3577,6 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf
|
||||
|
||||
const siteHealTimeInterval = 10 * time.Second
|
||||
|
||||
var siteReplicationHealLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{
|
||||
timeout: 30 * time.Second,
|
||||
minimum: 10 * time.Second,
|
||||
retryInterval: time.Second,
|
||||
})
|
||||
|
||||
func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
// Run the site replication healing in a loop
|
||||
@ -3598,15 +3592,8 @@ func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI Object
|
||||
}
|
||||
|
||||
func (c *SiteReplicationSys) healRoutine(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Make sure only one node running site replication on the cluster.
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, "site-replication/heal.lock")
|
||||
lkctx, err := locker.GetLock(ctx, siteReplicationHealLockTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lkctx.Cancel()
|
||||
// No unlock for "leader" lock.
|
||||
ctx, cancel := globalLeaderLock.GetLock(ctx)
|
||||
defer cancel()
|
||||
|
||||
healTimer := time.NewTimer(siteHealTimeInterval)
|
||||
defer healTimer.Stop()
|
||||
|
Loading…
Reference in New Issue
Block a user