mirror of
https://github.com/minio/minio.git
synced 2025-01-26 06:03:17 -05:00
Reload replication targets lazily if missing (#18333)
There can be rare situations where errors seen in bucket metadata load on startup or subsequent metadata updates can result in missing replication remotes. Attempt a refresh of remote targets backed by a good replication config lazily in 5 minute intervals if there ever occurs a situation where remote targets go AWOL.
This commit is contained in:
parent
b8d62a8068
commit
03dc65e12d
@ -575,7 +575,7 @@ func (api objectAPIHandlers) ValidateBucketReplicationCredsHandler(w http.Respon
|
|||||||
if rule.Status == replication.Disabled {
|
if rule.Status == replication.Disabled {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
clnt := globalBucketTargetSys.GetRemoteTargetClient(rule.Destination.Bucket)
|
clnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, rule.Destination.Bucket)
|
||||||
if clnt == nil {
|
if clnt == nil {
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrRemoteTargetNotFoundError, fmt.Errorf("replication config with rule ID %s has a stale target", rule.ID)), r.URL)
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrRemoteTargetNotFoundError, fmt.Errorf("replication config with rule ID %s has a stale target", rule.ID)), r.URL)
|
||||||
return
|
return
|
||||||
|
@ -113,7 +113,7 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
|
|||||||
if arn.Type != madmin.ReplicationService {
|
if arn.Type != madmin.ReplicationService {
|
||||||
return sameTarget, toAPIError(ctx, BucketRemoteArnTypeInvalid{Bucket: bucket})
|
return sameTarget, toAPIError(ctx, BucketRemoteArnTypeInvalid{Bucket: bucket})
|
||||||
}
|
}
|
||||||
clnt := globalBucketTargetSys.GetRemoteTargetClient(arnStr)
|
clnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, arnStr)
|
||||||
if clnt == nil {
|
if clnt == nil {
|
||||||
return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
|
return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
|
||||||
}
|
}
|
||||||
@ -138,7 +138,7 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// validate replication ARN against target endpoint
|
// validate replication ARN against target endpoint
|
||||||
c := globalBucketTargetSys.GetRemoteTargetClient(arnStr)
|
c := globalBucketTargetSys.GetRemoteTargetClient(bucket, arnStr)
|
||||||
if c != nil {
|
if c != nil {
|
||||||
if err := checkRemoteEndpoint(ctx, c.EndpointURL()); err != nil {
|
if err := checkRemoteEndpoint(ctx, c.EndpointURL()); err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
@ -280,7 +280,7 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica
|
|||||||
}
|
}
|
||||||
tgtArns := cfg.FilterTargetArns(opts)
|
tgtArns := cfg.FilterTargetArns(opts)
|
||||||
for _, tgtArn := range tgtArns {
|
for _, tgtArn := range tgtArns {
|
||||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn)
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn)
|
||||||
// the target online status should not be used here while deciding
|
// the target online status should not be used here while deciding
|
||||||
// whether to replicate as the target could be temporarily down
|
// whether to replicate as the target could be temporarily down
|
||||||
opts.TargetArn = tgtArn
|
opts.TargetArn = tgtArn
|
||||||
@ -383,7 +383,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn)
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn)
|
||||||
// the target online status should not be used here while deciding
|
// the target online status should not be used here while deciding
|
||||||
// whether to replicate deletes as the target could be temporarily down
|
// whether to replicate deletes as the target could be temporarily down
|
||||||
tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
|
tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
|
||||||
@ -495,7 +495,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
|
|||||||
if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn {
|
if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(tgtEntry.Arn)
|
tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtEntry.Arn)
|
||||||
if tgtClnt == nil {
|
if tgtClnt == nil {
|
||||||
// Skip stale targets if any and log them to be missing atleast once.
|
// Skip stale targets if any and log them to be missing atleast once.
|
||||||
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtEntry.Arn), tgtEntry.Arn)
|
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtEntry.Arn), tgtEntry.Arn)
|
||||||
@ -1023,7 +1023,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
for _, tgtArn := range tgtArns {
|
for _, tgtArn := range tgtArns {
|
||||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn)
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn)
|
||||||
if tgt == nil {
|
if tgt == nil {
|
||||||
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn)
|
logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn)
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
@ -2188,7 +2188,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa
|
|||||||
return nil, oi, proxy
|
return nil, oi, proxy
|
||||||
}
|
}
|
||||||
for _, t := range proxyTargets.Targets {
|
for _, t := range proxyTargets.Targets {
|
||||||
tgt = globalBucketTargetSys.GetRemoteTargetClient(t.Arn)
|
tgt = globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn)
|
||||||
if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -2560,7 +2560,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
|
|||||||
logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
|
logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(opts.arn)
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(opts.bucket, opts.arn)
|
||||||
if tgt == nil {
|
if tgt == nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
|
logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
|
||||||
return
|
return
|
||||||
|
@ -40,14 +40,29 @@ const (
|
|||||||
defaultHealthCheckReloadDuration = 30 * time.Minute
|
defaultHealthCheckReloadDuration = 30 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type arnTarget struct {
|
||||||
|
Client *TargetClient
|
||||||
|
lastRefresh time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// arnErrs represents number of errors seen for a ARN and if update is in progress
|
||||||
|
// to refresh remote targets from bucket metadata.
|
||||||
|
type arnErrs struct {
|
||||||
|
count int64
|
||||||
|
updateInProgress bool
|
||||||
|
bucket string
|
||||||
|
}
|
||||||
|
|
||||||
// BucketTargetSys represents bucket targets subsystem
|
// BucketTargetSys represents bucket targets subsystem
|
||||||
type BucketTargetSys struct {
|
type BucketTargetSys struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
arnRemotesMap map[string]*TargetClient
|
arnRemotesMap map[string]arnTarget
|
||||||
targetsMap map[string][]madmin.BucketTarget
|
targetsMap map[string][]madmin.BucketTarget
|
||||||
hMutex sync.RWMutex
|
hMutex sync.RWMutex
|
||||||
hc map[string]epHealth
|
hc map[string]epHealth
|
||||||
hcClient *madmin.AnonymousClient
|
hcClient *madmin.AnonymousClient
|
||||||
|
aMutex sync.RWMutex
|
||||||
|
arnErrsMap map[string]arnErrs // map of ARN to error count of failures to get target
|
||||||
}
|
}
|
||||||
|
|
||||||
type latencyStat struct {
|
type latencyStat struct {
|
||||||
@ -364,7 +379,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
|||||||
}
|
}
|
||||||
|
|
||||||
sys.targetsMap[bucket] = newtgts
|
sys.targetsMap[bucket] = newtgts
|
||||||
sys.arnRemotesMap[tgt.Arn] = clnt
|
sys.arnRemotesMap[tgt.Arn] = arnTarget{Client: clnt}
|
||||||
sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit)
|
sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -432,11 +447,71 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sys *BucketTargetSys) markRefreshInProgress(bucket, arn string) {
|
||||||
|
sys.aMutex.Lock()
|
||||||
|
defer sys.aMutex.Unlock()
|
||||||
|
if v, ok := sys.arnErrsMap[arn]; !ok {
|
||||||
|
sys.arnErrsMap[arn] = arnErrs{
|
||||||
|
updateInProgress: true,
|
||||||
|
count: v.count + 1,
|
||||||
|
bucket: bucket,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sys *BucketTargetSys) markRefreshDone(bucket, arn string) {
|
||||||
|
sys.aMutex.Lock()
|
||||||
|
defer sys.aMutex.Unlock()
|
||||||
|
if v, ok := sys.arnErrsMap[arn]; ok {
|
||||||
|
sys.arnErrsMap[arn] = arnErrs{
|
||||||
|
updateInProgress: false,
|
||||||
|
count: v.count,
|
||||||
|
bucket: bucket,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sys *BucketTargetSys) isReloadingTarget(bucket, arn string) bool {
|
||||||
|
sys.aMutex.RLock()
|
||||||
|
defer sys.aMutex.RUnlock()
|
||||||
|
if v, ok := sys.arnErrsMap[arn]; ok {
|
||||||
|
return v.updateInProgress
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sys *BucketTargetSys) incTargetErr(bucket, arn string) {
|
||||||
|
sys.aMutex.Lock()
|
||||||
|
defer sys.aMutex.Unlock()
|
||||||
|
if v, ok := sys.arnErrsMap[arn]; ok {
|
||||||
|
sys.arnErrsMap[arn] = arnErrs{
|
||||||
|
updateInProgress: v.updateInProgress,
|
||||||
|
count: v.count + 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetRemoteTargetClient returns minio-go client for replication target instance
|
// GetRemoteTargetClient returns minio-go client for replication target instance
|
||||||
func (sys *BucketTargetSys) GetRemoteTargetClient(arn string) *TargetClient {
|
func (sys *BucketTargetSys) GetRemoteTargetClient(bucket, arn string) *TargetClient {
|
||||||
sys.RLock()
|
sys.RLock()
|
||||||
defer sys.RUnlock()
|
tgt := sys.arnRemotesMap[arn]
|
||||||
return sys.arnRemotesMap[arn]
|
sys.RUnlock()
|
||||||
|
|
||||||
|
if tgt.Client != nil {
|
||||||
|
return tgt.Client
|
||||||
|
}
|
||||||
|
defer func() { // lazy refresh remote targets
|
||||||
|
if tgt.Client == nil && !sys.isReloadingTarget(bucket, arn) && (tgt.lastRefresh.Equal(timeSentinel) || tgt.lastRefresh.Before(UTCNow().Add(-5*time.Minute))) {
|
||||||
|
tgts, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket)
|
||||||
|
if err == nil {
|
||||||
|
sys.markRefreshInProgress(bucket, arn)
|
||||||
|
sys.UpdateAllTargets(bucket, tgts)
|
||||||
|
sys.markRefreshDone(bucket, arn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sys.incTargetErr(bucket, arn)
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRemoteBucketTargetByArn returns BucketTarget for a ARN
|
// GetRemoteBucketTargetByArn returns BucketTarget for a ARN
|
||||||
@ -457,8 +532,9 @@ func (sys *BucketTargetSys) GetRemoteBucketTargetByArn(ctx context.Context, buck
|
|||||||
// NewBucketTargetSys - creates new replication system.
|
// NewBucketTargetSys - creates new replication system.
|
||||||
func NewBucketTargetSys(ctx context.Context) *BucketTargetSys {
|
func NewBucketTargetSys(ctx context.Context) *BucketTargetSys {
|
||||||
sys := &BucketTargetSys{
|
sys := &BucketTargetSys{
|
||||||
arnRemotesMap: make(map[string]*TargetClient),
|
arnRemotesMap: make(map[string]arnTarget),
|
||||||
targetsMap: make(map[string][]madmin.BucketTarget),
|
targetsMap: make(map[string][]madmin.BucketTarget),
|
||||||
|
arnErrsMap: make(map[string]arnErrs),
|
||||||
hc: make(map[string]epHealth),
|
hc: make(map[string]epHealth),
|
||||||
hcClient: newHCClient(),
|
hcClient: newHCClient(),
|
||||||
}
|
}
|
||||||
@ -502,7 +578,10 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sys.arnRemotesMap[tgt.Arn] = tgtClient
|
sys.arnRemotesMap[tgt.Arn] = arnTarget{
|
||||||
|
Client: tgtClient,
|
||||||
|
lastRefresh: UTCNow(),
|
||||||
|
}
|
||||||
sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit)
|
sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -526,7 +605,7 @@ func (sys *BucketTargetSys) set(bucket BucketInfo, meta BucketMetadata) {
|
|||||||
logger.LogIf(GlobalContext, err)
|
logger.LogIf(GlobalContext, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sys.arnRemotesMap[tgt.Arn] = tgtClient
|
sys.arnRemotesMap[tgt.Arn] = arnTarget{Client: tgtClient}
|
||||||
sys.updateBandwidthLimit(bucket.Name, tgt.Arn, tgt.BandwidthLimit)
|
sys.updateBandwidthLimit(bucket.Name, tgt.Arn, tgt.BandwidthLimit)
|
||||||
}
|
}
|
||||||
sys.targetsMap[bucket.Name] = cfg.Targets
|
sys.targetsMap[bucket.Name] = cfg.Targets
|
||||||
|
Loading…
x
Reference in New Issue
Block a user