mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
replication: centralize healthcheck for remote targets (#15516)
This PR moves health check from minio-go client to being managed on the server. Additionally integrating health check into site replication
This commit is contained in:
@@ -19,6 +19,8 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -35,7 +37,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultHealthCheckDuration = 30 * time.Second
|
||||
defaultHealthCheckDuration = 5 * time.Second
|
||||
// default interval for reload of all remote target endpoints
|
||||
defaultHealthCheckReloadDuration = 30 * time.Minute
|
||||
)
|
||||
|
||||
// BucketTargetSys represents bucket targets subsystem
|
||||
@@ -43,6 +47,113 @@ type BucketTargetSys struct {
|
||||
sync.RWMutex
|
||||
arnRemotesMap map[string]*TargetClient
|
||||
targetsMap map[string][]madmin.BucketTarget
|
||||
hMutex sync.RWMutex
|
||||
hc map[string]epHealth
|
||||
hcClient *madmin.AnonymousClient
|
||||
}
|
||||
|
||||
// epHealth struct represents health of a replication target endpoint.
|
||||
type epHealth struct {
|
||||
Endpoint string
|
||||
Scheme string
|
||||
Online bool
|
||||
}
|
||||
|
||||
// isOffline returns current liveness result of remote target. Add endpoint to
|
||||
// healthcheck map if missing and default to online status
|
||||
func (sys *BucketTargetSys) isOffline(ep *url.URL) bool {
|
||||
sys.hMutex.RLock()
|
||||
defer sys.hMutex.RUnlock()
|
||||
if h, ok := sys.hc[ep.Host]; ok {
|
||||
return !h.Online
|
||||
}
|
||||
go sys.initHC(ep)
|
||||
return false
|
||||
}
|
||||
|
||||
func (sys *BucketTargetSys) initHC(ep *url.URL) {
|
||||
sys.hMutex.Lock()
|
||||
sys.hc[ep.Host] = epHealth{
|
||||
Endpoint: ep.Host,
|
||||
Scheme: ep.Scheme,
|
||||
Online: true,
|
||||
}
|
||||
sys.hMutex.Unlock()
|
||||
}
|
||||
|
||||
// newHCClient initializes an anonymous client for performing health check on the remote endpoints
|
||||
func newHCClient() *madmin.AnonymousClient {
|
||||
clnt, e := madmin.NewAnonymousClientNoEndpoint()
|
||||
if e != nil {
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to initialize health check client"), string(replicationSubsystem))
|
||||
return nil
|
||||
}
|
||||
clnt.SetCustomTransport(globalRemoteTargetTransport)
|
||||
return clnt
|
||||
}
|
||||
|
||||
// heartBeat performs liveness check on remote endpoints.
|
||||
func (sys *BucketTargetSys) heartBeat(ctx context.Context) {
|
||||
hcTimer := time.NewTimer(defaultHealthCheckDuration)
|
||||
defer hcTimer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-hcTimer.C:
|
||||
sys.hMutex.RLock()
|
||||
var eps []madmin.ServerProperties
|
||||
for _, ep := range sys.hc {
|
||||
eps = append(eps, madmin.ServerProperties{Endpoint: ep.Endpoint, Scheme: ep.Scheme})
|
||||
}
|
||||
sys.hMutex.RUnlock()
|
||||
|
||||
if len(eps) > 0 {
|
||||
cctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
m := map[string]epHealth{}
|
||||
for result := range sys.hcClient.Alive(cctx, madmin.AliveOpts{}, eps...) {
|
||||
var online bool
|
||||
if result.Error == nil {
|
||||
online = result.Online
|
||||
}
|
||||
m[result.Endpoint.Host] = epHealth{
|
||||
Endpoint: result.Endpoint.Host,
|
||||
Scheme: result.Endpoint.Scheme,
|
||||
Online: online,
|
||||
}
|
||||
}
|
||||
sys.hMutex.Lock()
|
||||
sys.hc = m
|
||||
sys.hMutex.Unlock()
|
||||
}
|
||||
hcTimer.Reset(defaultHealthCheckDuration)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// periodically rebuild the healthcheck map from list of targets to clear
|
||||
// out stale endpoints
|
||||
func (sys *BucketTargetSys) reloadHealthCheckers(ctx context.Context) {
|
||||
m := make(map[string]epHealth)
|
||||
tgts := sys.ListTargets(ctx, "", "")
|
||||
for _, t := range tgts {
|
||||
if _, ok := m[t.Endpoint]; !ok {
|
||||
scheme := "http"
|
||||
if t.Secure {
|
||||
scheme = "https"
|
||||
}
|
||||
m[t.Endpoint] = epHealth{
|
||||
Online: true,
|
||||
Endpoint: t.Endpoint,
|
||||
Scheme: scheme,
|
||||
}
|
||||
}
|
||||
}
|
||||
sys.hMutex.Lock()
|
||||
// swap out the map
|
||||
sys.hc = m
|
||||
sys.hMutex.Unlock()
|
||||
}
|
||||
|
||||
// ListTargets lists bucket targets across tenant or for individual bucket, and returns
|
||||
@@ -91,9 +202,6 @@ func (sys *BucketTargetSys) Delete(bucket string) {
|
||||
return
|
||||
}
|
||||
for _, t := range tgts {
|
||||
if tgt, ok := sys.arnRemotesMap[t.Arn]; ok && tgt.healthCancelFn != nil {
|
||||
tgt.healthCancelFn()
|
||||
}
|
||||
delete(sys.arnRemotesMap, t.Arn)
|
||||
}
|
||||
delete(sys.targetsMap, bucket)
|
||||
@@ -116,7 +224,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
||||
if minio.ToErrorResponse(err).Code == "NoSuchBucket" {
|
||||
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
|
||||
return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, Err: err}
|
||||
}
|
||||
if tgt.Type == madmin.ReplicationService {
|
||||
if !globalBucketVersioningSys.Enabled(bucket) {
|
||||
@@ -124,7 +232,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
||||
}
|
||||
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
|
||||
if err != nil {
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
|
||||
return RemoteTargetConnectionErr{Bucket: tgt.TargetBucket, Err: err}
|
||||
}
|
||||
if vcfg.Status != string(versioning.Enabled) {
|
||||
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}
|
||||
@@ -152,10 +260,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
||||
if !found && !update {
|
||||
newtgts = append(newtgts, *tgt)
|
||||
}
|
||||
// cancel health check for previous target client to avoid leak.
|
||||
if prevClnt, ok := sys.arnRemotesMap[tgt.Arn]; ok && prevClnt.healthCancelFn != nil {
|
||||
prevClnt.healthCancelFn()
|
||||
}
|
||||
|
||||
sys.targetsMap[bucket] = newtgts
|
||||
sys.arnRemotesMap[tgt.Arn] = clnt
|
||||
sys.updateBandwidthLimit(bucket, tgt.BandwidthLimit)
|
||||
@@ -227,9 +332,6 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
|
||||
return BucketRemoteTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
sys.targetsMap[bucket] = targets
|
||||
if tgt, ok := sys.arnRemotesMap[arnStr]; ok && tgt.healthCancelFn != nil {
|
||||
tgt.healthCancelFn()
|
||||
}
|
||||
delete(sys.arnRemotesMap, arnStr)
|
||||
sys.updateBandwidthLimit(bucket, 0)
|
||||
return nil
|
||||
@@ -258,11 +360,29 @@ func (sys *BucketTargetSys) GetRemoteBucketTargetByArn(ctx context.Context, buck
|
||||
}
|
||||
|
||||
// NewBucketTargetSys - creates new replication system.
|
||||
func NewBucketTargetSys() *BucketTargetSys {
|
||||
return &BucketTargetSys{
|
||||
func NewBucketTargetSys(ctx context.Context) *BucketTargetSys {
|
||||
sys := &BucketTargetSys{
|
||||
arnRemotesMap: make(map[string]*TargetClient),
|
||||
targetsMap: make(map[string][]madmin.BucketTarget),
|
||||
hc: make(map[string]epHealth),
|
||||
hcClient: newHCClient(),
|
||||
}
|
||||
// reload healthcheck endpoints map periodically to remove stale endpoints from the map.
|
||||
go func() {
|
||||
rTimer := time.NewTimer(defaultHealthCheckReloadDuration)
|
||||
defer rTimer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-rTimer.C:
|
||||
sys.reloadHealthCheckers(ctx)
|
||||
rTimer.Reset(defaultHealthCheckReloadDuration)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
go sys.heartBeat(ctx)
|
||||
return sys
|
||||
}
|
||||
|
||||
// UpdateAllTargets updates target to reflect metadata updates
|
||||
@@ -276,9 +396,6 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
|
||||
// Remove existingtarget and arn association
|
||||
if tgts, ok := sys.targetsMap[bucket]; ok {
|
||||
for _, t := range tgts {
|
||||
if tgt, ok := sys.arnRemotesMap[t.Arn]; ok && tgt.healthCancelFn != nil {
|
||||
tgt.healthCancelFn()
|
||||
}
|
||||
delete(sys.arnRemotesMap, t.Arn)
|
||||
}
|
||||
delete(sys.targetsMap, bucket)
|
||||
@@ -345,10 +462,6 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T
|
||||
if tcfg.HealthCheckDuration >= 1 { // require minimum health check duration of 1 sec.
|
||||
hcDuration = tcfg.HealthCheckDuration
|
||||
}
|
||||
cancelFn, err := api.HealthCheck(hcDuration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tc := &TargetClient{
|
||||
Client: api,
|
||||
healthCheckDuration: hcDuration,
|
||||
@@ -356,9 +469,10 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T
|
||||
Bucket: tcfg.TargetBucket,
|
||||
StorageClass: tcfg.StorageClass,
|
||||
disableProxy: tcfg.DisableProxy,
|
||||
healthCancelFn: cancelFn,
|
||||
ARN: tcfg.Arn,
|
||||
ResetID: tcfg.ResetID,
|
||||
Endpoint: tcfg.Endpoint,
|
||||
Secure: tcfg.Secure,
|
||||
}
|
||||
return tc, nil
|
||||
}
|
||||
@@ -432,7 +546,8 @@ type TargetClient struct {
|
||||
replicateSync bool
|
||||
StorageClass string // storage class on remote
|
||||
disableProxy bool
|
||||
healthCancelFn context.CancelFunc // cancellation function for client healthcheck
|
||||
ARN string // ARN to uniquely identify remote target
|
||||
ARN string // ARN to uniquely identify remote target
|
||||
ResetID string
|
||||
Endpoint string
|
||||
Secure bool
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user