diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index c70a71f9d..3ca303797 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -249,7 +249,7 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http. } if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, update); err != nil { switch err.(type) { - case BucketRemoteConnectionErr: + case RemoteTargetConnectionErr: writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationRemoteConnectionError, err), r.URL) default: writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 5bdc58034..7f73367c8 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -888,7 +888,7 @@ var errorCodes = errorCodeMap{ }, ErrReplicationRemoteConnectionError: { Code: "XMinioAdminReplicationRemoteConnectionError", - Description: "Remote service connection error - please check remote service credentials and target bucket", + Description: "Remote service connection error", HTTPStatusCode: http.StatusNotFound, }, ErrReplicationBandwidthLimitError: { @@ -2074,7 +2074,7 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) { apiErr = ErrRemoteDestinationNotFoundError case BucketRemoteTargetNotFound: apiErr = ErrRemoteTargetNotFoundError - case BucketRemoteConnectionErr: + case RemoteTargetConnectionErr: apiErr = ErrReplicationRemoteConnectionError case BucketRemoteAlreadyExists: apiErr = ErrBucketRemoteAlreadyExists diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index b52470377..c0bc1ac24 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -310,7 +310,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseW rinfo.Targets = append(rinfo.Targets, ResyncTarget{Arn: tgtArns[0], ResetID: target.ResetID}) if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil { switch err.(type) { - case BucketRemoteConnectionErr: + case RemoteTargetConnectionErr: writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationRemoteConnectionError, err), r.URL) default: writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 296413ea1..0e2e626bb 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -534,7 +534,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI if dobj.VersionID != "" && rinfo.VersionPurgeStatus == Complete { return } - if tgt.IsOffline() { + if globalBucketTargetSys.isOffline(tgt.EndpointURL()) { logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", dobj.Bucket, tgt.ARN)) sendEvent(eventArgs{ BucketName: dobj.Bucket, @@ -1039,7 +1039,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object rinfo.ReplicationResynced = true return } - if tgt.IsOffline() { + if globalBucketTargetSys.isOffline(tgt.EndpointURL()) { logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN)) sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, @@ -1659,7 +1659,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa } for _, t := range proxyTargets.Targets { tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, t.Arn) - if tgt == nil || tgt.IsOffline() { + if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) { continue } // if proxying explicitly disabled on remote target diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index dc8c52e3e..e1b5c592c 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -19,6 +19,8 @@ package cmd import ( "context" + "fmt" + "net/url" "sync" "time" @@ -35,7 +37,9 @@ import ( ) const ( - defaultHealthCheckDuration = 30 * time.Second + defaultHealthCheckDuration = 5 * time.Second + // default interval for reload of all remote target endpoints + defaultHealthCheckReloadDuration = 30 * time.Minute ) // BucketTargetSys represents bucket targets subsystem @@ -43,6 +47,113 @@ type BucketTargetSys struct { sync.RWMutex arnRemotesMap map[string]*TargetClient targetsMap map[string][]madmin.BucketTarget + hMutex sync.RWMutex + hc map[string]epHealth + hcClient *madmin.AnonymousClient +} + +// epHealth struct represents health of a replication target endpoint. +type epHealth struct { + Endpoint string + Scheme string + Online bool +} + +// isOffline returns current liveness result of remote target. Add endpoint to +// healthcheck map if missing and default to online status +func (sys *BucketTargetSys) isOffline(ep *url.URL) bool { + sys.hMutex.RLock() + defer sys.hMutex.RUnlock() + if h, ok := sys.hc[ep.Host]; ok { + return !h.Online + } + go sys.initHC(ep) + return false +} + +func (sys *BucketTargetSys) initHC(ep *url.URL) { + sys.hMutex.Lock() + sys.hc[ep.Host] = epHealth{ + Endpoint: ep.Host, + Scheme: ep.Scheme, + Online: true, + } + sys.hMutex.Unlock() +} + +// newHCClient initializes an anonymous client for performing health check on the remote endpoints +func newHCClient() *madmin.AnonymousClient { + clnt, e := madmin.NewAnonymousClientNoEndpoint() + if e != nil { + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to initialize health check client"), string(replicationSubsystem)) + return nil + } + clnt.SetCustomTransport(globalRemoteTargetTransport) + return clnt +} + +// heartBeat performs liveness check on remote endpoints. +func (sys *BucketTargetSys) heartBeat(ctx context.Context) { + hcTimer := time.NewTimer(defaultHealthCheckDuration) + defer hcTimer.Stop() + for { + select { + case <-hcTimer.C: + sys.hMutex.RLock() + var eps []madmin.ServerProperties + for _, ep := range sys.hc { + eps = append(eps, madmin.ServerProperties{Endpoint: ep.Endpoint, Scheme: ep.Scheme}) + } + sys.hMutex.RUnlock() + + if len(eps) > 0 { + cctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + m := map[string]epHealth{} + for result := range sys.hcClient.Alive(cctx, madmin.AliveOpts{}, eps...) { + var online bool + if result.Error == nil { + online = result.Online + } + m[result.Endpoint.Host] = epHealth{ + Endpoint: result.Endpoint.Host, + Scheme: result.Endpoint.Scheme, + Online: online, + } + } + sys.hMutex.Lock() + sys.hc = m + sys.hMutex.Unlock() + } + hcTimer.Reset(defaultHealthCheckDuration) + case <-ctx.Done(): + return + } + } +} + +// periodically rebuild the healthcheck map from list of targets to clear +// out stale endpoints +func (sys *BucketTargetSys) reloadHealthCheckers(ctx context.Context) { + m := make(map[string]epHealth) + tgts := sys.ListTargets(ctx, "", "") + for _, t := range tgts { + if _, ok := m[t.Endpoint]; !ok { + scheme := "http" + if t.Secure { + scheme = "https" + } + m[t.Endpoint] = epHealth{ + Online: true, + Endpoint: t.Endpoint, + Scheme: scheme, + } + } + } + sys.hMutex.Lock() + // swap out the map + sys.hc = m + sys.hMutex.Unlock() } // ListTargets lists bucket targets across tenant or for individual bucket, and returns @@ -91,9 +202,6 @@ func (sys *BucketTargetSys) Delete(bucket string) { return } for _, t := range tgts { - if tgt, ok := sys.arnRemotesMap[t.Arn]; ok && tgt.healthCancelFn != nil { - tgt.healthCancelFn() - } delete(sys.arnRemotesMap, t.Arn) } delete(sys.targetsMap, bucket) @@ -116,7 +224,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m if minio.ToErrorResponse(err).Code == "NoSuchBucket" { return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket} } - return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err} + return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, Err: err} } if tgt.Type == madmin.ReplicationService { if !globalBucketVersioningSys.Enabled(bucket) { @@ -124,7 +232,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m } vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket) if err != nil { - return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err} + return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, Err: err} } if vcfg.Status != string(versioning.Enabled) { return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket} @@ -152,10 +260,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m if !found && !update { newtgts = append(newtgts, *tgt) } - // cancel health check for previous target client to avoid leak. - if prevClnt, ok := sys.arnRemotesMap[tgt.Arn]; ok && prevClnt.healthCancelFn != nil { - prevClnt.healthCancelFn() - } + sys.targetsMap[bucket] = newtgts sys.arnRemotesMap[tgt.Arn] = clnt sys.updateBandwidthLimit(bucket, tgt.BandwidthLimit) @@ -227,9 +332,6 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str return BucketRemoteTargetNotFound{Bucket: bucket} } sys.targetsMap[bucket] = targets - if tgt, ok := sys.arnRemotesMap[arnStr]; ok && tgt.healthCancelFn != nil { - tgt.healthCancelFn() - } delete(sys.arnRemotesMap, arnStr) sys.updateBandwidthLimit(bucket, 0) return nil @@ -258,11 +360,29 @@ func (sys *BucketTargetSys) GetRemoteBucketTargetByArn(ctx context.Context, buck } // NewBucketTargetSys - creates new replication system. -func NewBucketTargetSys() *BucketTargetSys { - return &BucketTargetSys{ +func NewBucketTargetSys(ctx context.Context) *BucketTargetSys { + sys := &BucketTargetSys{ arnRemotesMap: make(map[string]*TargetClient), targetsMap: make(map[string][]madmin.BucketTarget), + hc: make(map[string]epHealth), + hcClient: newHCClient(), } + // reload healthcheck endpoints map periodically to remove stale endpoints from the map. + go func() { + rTimer := time.NewTimer(defaultHealthCheckReloadDuration) + defer rTimer.Stop() + for { + select { + case <-rTimer.C: + sys.reloadHealthCheckers(ctx) + rTimer.Reset(defaultHealthCheckReloadDuration) + case <-ctx.Done(): + return + } + } + }() + go sys.heartBeat(ctx) + return sys } // UpdateAllTargets updates target to reflect metadata updates @@ -276,9 +396,6 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT // Remove existingtarget and arn association if tgts, ok := sys.targetsMap[bucket]; ok { for _, t := range tgts { - if tgt, ok := sys.arnRemotesMap[t.Arn]; ok && tgt.healthCancelFn != nil { - tgt.healthCancelFn() - } delete(sys.arnRemotesMap, t.Arn) } delete(sys.targetsMap, bucket) @@ -345,10 +462,6 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T if tcfg.HealthCheckDuration >= 1 { // require minimum health check duration of 1 sec. hcDuration = tcfg.HealthCheckDuration } - cancelFn, err := api.HealthCheck(hcDuration) - if err != nil { - return nil, err - } tc := &TargetClient{ Client: api, healthCheckDuration: hcDuration, @@ -356,9 +469,10 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T Bucket: tcfg.TargetBucket, StorageClass: tcfg.StorageClass, disableProxy: tcfg.DisableProxy, - healthCancelFn: cancelFn, ARN: tcfg.Arn, ResetID: tcfg.ResetID, + Endpoint: tcfg.Endpoint, + Secure: tcfg.Secure, } return tc, nil } @@ -432,7 +546,8 @@ type TargetClient struct { replicateSync bool StorageClass string // storage class on remote disableProxy bool - healthCancelFn context.CancelFunc // cancellation function for client healthcheck - ARN string // ARN to uniquely identify remote target + ARN string // ARN to uniquely identify remote target ResetID string + Endpoint string + Secure bool } diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index 8ff6866bc..5f895b4ac 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -422,11 +422,18 @@ func (e BucketRemoteTargetNotFound) Error() string { return "Remote target not found: " + e.Bucket } -// BucketRemoteConnectionErr remote target connection failure. -type BucketRemoteConnectionErr GenericError +// RemoteTargetConnectionErr remote target connection failure. +type RemoteTargetConnectionErr struct { + Err error + Bucket string + Endpoint string +} -func (e BucketRemoteConnectionErr) Error() string { - return fmt.Sprintf("Remote service endpoint or target bucket not available: %s \n\t%s", e.Bucket, e.Err.Error()) +func (e RemoteTargetConnectionErr) Error() string { + if e.Bucket != "" { + return fmt.Sprintf("Remote service endpoint offline or target bucket/remote service credentials invalid: %s \n\t%s", e.Bucket, e.Err.Error()) + } + return fmt.Sprintf("Remote service endpoint %s not available\n\t%s", e.Endpoint, e.Err.Error()) } // BucketRemoteAlreadyExists remote already exists for this target type. diff --git a/cmd/server-main.go b/cmd/server-main.go index 7394dbc9d..90589889d 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -290,7 +290,7 @@ func initAllSubsystems() { } // Create new bucket replication subsytem - globalBucketTargetSys = NewBucketTargetSys() + globalBucketTargetSys = NewBucketTargetSys(GlobalContext) // Create new ILM tier configuration subsystem globalTierConfigMgr = NewTierConfigMgr() diff --git a/cmd/site-replication.go b/cmd/site-replication.go index c27fe8ce6..e63514010 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -2217,6 +2217,9 @@ func getAdminClient(endpoint, accessKey, secretKey string) (*madmin.AdminClient, if err != nil { return nil, err } + if globalBucketTargetSys.isOffline(epURL) { + return nil, RemoteTargetConnectionErr{Endpoint: epURL.String(), Err: fmt.Errorf("remote target is offline for endpoint %s", epURL.String())} + } client, err := madmin.New(epURL.Host, accessKey, secretKey, epURL.Scheme == "https") if err != nil { return nil, err @@ -2230,6 +2233,10 @@ func getS3Client(pc madmin.PeerSite) (*minioClient.Client, error) { if err != nil { return nil, err } + if globalBucketTargetSys.isOffline(ep) { + return nil, RemoteTargetConnectionErr{Endpoint: ep.String(), Err: fmt.Errorf("remote target is offline for endpoint %s", ep.String())} + } + return minioClient.New(ep.Host, &minioClient.Options{ Creds: credentials.NewStaticV4(pc.AccessKey, pc.SecretKey, ""), Secure: ep.Scheme == "https", diff --git a/go.mod b/go.mod index dcac1a1a8..e826b795b 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/minio/dperf v0.4.2 github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.20.0 - github.com/minio/madmin-go v1.4.17 + github.com/minio/madmin-go v1.4.20 github.com/minio/minio-go/v7 v7.0.34 github.com/minio/pkg v1.3.0 github.com/minio/selfupdate v0.5.0 diff --git a/go.sum b/go.sum index 55eaa5215..4ecd416cb 100644 --- a/go.sum +++ b/go.sum @@ -622,8 +622,8 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT github.com/minio/kes v0.20.0 h1:1tyC51Rr8zTregTESuT/QN/iebNMX7B9t7d3xLNMEpE= github.com/minio/kes v0.20.0/go.mod h1:3FW1BQkMGQW78yhy+69tUq5bdcf5rnXJizyeKB9a/tc= github.com/minio/madmin-go v1.3.5/go.mod h1:vGKGboQgGIWx4DuDUaXixjlIEZOCIp6ivJkQoiVaACc= -github.com/minio/madmin-go v1.4.17 h1:o8b42lrJW9qyIthIa+24IzygZjmPm04iSUSBITcvzYU= -github.com/minio/madmin-go v1.4.17/go.mod h1:ez87VmMtsxP7DRxjKJKD4RDNW+nhO2QF9KSzwxBDQ98= +github.com/minio/madmin-go v1.4.20 h1:OpPxc8uIaevJMpBGSZ2TXJvFGvPSDn0IT3VJPh7w90M= +github.com/minio/madmin-go v1.4.20/go.mod h1:ez87VmMtsxP7DRxjKJKD4RDNW+nhO2QF9KSzwxBDQ98= github.com/minio/mc v0.0.0-20220805080128-351d021b924b h1:ikMXncKqNE/0acH6us6yy3v+gJBP7nGv/3Rc9F7vRio= github.com/minio/mc v0.0.0-20220805080128-351d021b924b/go.mod h1:YUXIqqgGfFknByv0eeJSMBQl/WGuEN0XkpW68/ghBm0= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=