fix: check once ready for site-replication (#20149)

This commit is contained in:
jiuker 2024-07-27 01:27:42 +08:00 committed by GitHub
parent 1966668066
commit 132e7413ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 43 additions and 24 deletions

View File

@ -75,7 +75,7 @@ func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWr
writeErrorResponse(ctx, w, apiErr, r.URL) writeErrorResponse(ctx, w, apiErr, r.URL)
return return
} }
sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, true) sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, &validateReplicationDestinationOptions{CheckRemoteBucket: true})
if apiErr != noError { if apiErr != noError {
writeErrorResponse(ctx, w, apiErr, r.URL) writeErrorResponse(ctx, w, apiErr, r.URL)
return return
@ -559,7 +559,7 @@ func (api objectAPIHandlers) ValidateBucketReplicationCredsHandler(w http.Respon
lockEnabled = lcfg.Enabled() lockEnabled = lcfg.Enabled()
} }
sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, true) sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, &validateReplicationDestinationOptions{CheckRemoteBucket: true})
if apiErr != noError { if apiErr != noError {
writeErrorResponse(ctx, w, apiErr, r.URL) writeErrorResponse(ctx, w, apiErr, r.URL)
return return

View File

@ -91,7 +91,10 @@ func getReplicationConfig(ctx context.Context, bucketName string) (rc *replicati
// validateReplicationDestination returns error if replication destination bucket missing or not configured // validateReplicationDestination returns error if replication destination bucket missing or not configured
// It also returns true if replication destination is same as this server. // It also returns true if replication destination is same as this server.
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config, checkRemote bool) (bool, APIError) { func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config, opts *validateReplicationDestinationOptions) (bool, APIError) {
if opts == nil {
opts = &validateReplicationDestinationOptions{}
}
var arns []string var arns []string
if rCfg.RoleArn != "" { if rCfg.RoleArn != "" {
arns = append(arns, rCfg.RoleArn) arns = append(arns, rCfg.RoleArn)
@ -113,7 +116,7 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
if clnt == nil { if clnt == nil {
return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket}) return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
} }
if checkRemote { // validate remote bucket if opts.CheckRemoteBucket { // validate remote bucket
found, err := clnt.BucketExists(ctx, arn.Bucket) found, err := clnt.BucketExists(ctx, arn.Bucket)
if err != nil { if err != nil {
return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, err) return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, err)
@ -133,24 +136,30 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
} }
} }
} }
// validate replication ARN against target endpoint // if checked bucket, then check the ready is unnecessary
c := globalBucketTargetSys.GetRemoteTargetClient(bucket, arnStr) if !opts.CheckRemoteBucket && opts.CheckReady {
if c != nil { endpoint := clnt.EndpointURL().String()
if err := checkRemoteEndpoint(ctx, c.EndpointURL()); err != nil { if errInt, ok := opts.checkReadyErr.Load(endpoint); !ok {
err = checkRemoteEndpoint(ctx, clnt.EndpointURL())
opts.checkReadyErr.Store(endpoint, err)
} else {
if errInt == nil {
err = nil
} else {
err = errInt.(error)
}
}
switch err.(type) { switch err.(type) {
case BucketRemoteIdenticalToSource: case BucketRemoteIdenticalToSource:
return true, errorCodes.ToAPIErrWithErr(ErrBucketRemoteIdenticalToSource, fmt.Errorf("remote target endpoint %s is self referential", c.EndpointURL().String())) return true, errorCodes.ToAPIErrWithErr(ErrBucketRemoteIdenticalToSource, fmt.Errorf("remote target endpoint %s is self referential", clnt.EndpointURL().String()))
default: default:
} }
} }
if c.EndpointURL().String() == clnt.EndpointURL().String() { // validate replication ARN against target endpoint
selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort) selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
if !sameTarget { if !sameTarget {
sameTarget = selfTarget sameTarget = selfTarget
} }
continue
}
}
} }
if len(arns) == 0 { if len(arns) == 0 {
@ -3741,3 +3750,12 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch <-chan
return mrfCh, nil return mrfCh, nil
} }
// validateReplicationDestinationOptions is used to configure the validation of the replication destination.
// validateReplicationDestination uses this to configure the validation.
type validateReplicationDestinationOptions struct {
CheckRemoteBucket bool
CheckReady bool
checkReadyErr sync.Map
}

View File

@ -1129,7 +1129,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
if err != nil { if err != nil {
return err return err
} }
sameTarget, apiErr := validateReplicationDestination(ctx, bucket, newReplicationConfig, true) sameTarget, apiErr := validateReplicationDestination(ctx, bucket, newReplicationConfig, &validateReplicationDestinationOptions{CheckRemoteBucket: true})
if apiErr != noError { if apiErr != noError {
return fmt.Errorf("bucket replication config validation error: %#v", apiErr) return fmt.Errorf("bucket replication config validation error: %#v", apiErr)
} }
@ -4453,6 +4453,7 @@ func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer
return err return err
} }
ilmExpiryCfgHealed := false ilmExpiryCfgHealed := false
opts := validateReplicationDestinationOptions{CheckReady: true}
for _, bi := range buckets { for _, bi := range buckets {
bucket := bi.Name bucket := bi.Name
info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{ info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{
@ -4472,7 +4473,7 @@ func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer
c.healVersioningMetadata(ctx, objAPI, bucket, info) c.healVersioningMetadata(ctx, objAPI, bucket, info)
c.healOLockConfigMetadata(ctx, objAPI, bucket, info) c.healOLockConfigMetadata(ctx, objAPI, bucket, info)
c.healSSEMetadata(ctx, objAPI, bucket, info) c.healSSEMetadata(ctx, objAPI, bucket, info)
c.healBucketReplicationConfig(ctx, objAPI, bucket, info) c.healBucketReplicationConfig(ctx, objAPI, bucket, info, &opts)
c.healBucketPolicies(ctx, objAPI, bucket, info) c.healBucketPolicies(ctx, objAPI, bucket, info)
c.healTagMetadata(ctx, objAPI, bucket, info) c.healTagMetadata(ctx, objAPI, bucket, info)
c.healBucketQuotaConfig(ctx, objAPI, bucket, info) c.healBucketQuotaConfig(ctx, objAPI, bucket, info)
@ -5172,7 +5173,7 @@ func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer,
return nil return nil
} }
func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo, opts *validateReplicationDestinationOptions) error {
bs := info.BucketStats[bucket] bs := info.BucketStats[bucket]
c.RLock() c.RLock()
@ -5226,7 +5227,7 @@ func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, ob
if rcfg != nil && !replMismatch { if rcfg != nil && !replMismatch {
// validate remote targets on current cluster for this bucket // validate remote targets on current cluster for this bucket
_, apiErr := validateReplicationDestination(ctx, bucket, rcfg, false) _, apiErr := validateReplicationDestination(ctx, bucket, rcfg, opts)
if apiErr != noError { if apiErr != noError {
replMismatch = true replMismatch = true
} }