mirror of https://github.com/minio/minio.git
Avoid peer notification when peer is offline, tune retries (#16737)
This commit is contained in:
parent
901887e6bf
commit
a6057c35cc
|
@ -56,14 +56,23 @@ type NotificationPeerErr struct {
|
||||||
//
|
//
|
||||||
// A zero NotificationGroup is valid and does not cancel on error.
|
// A zero NotificationGroup is valid and does not cancel on error.
|
||||||
type NotificationGroup struct {
|
type NotificationGroup struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
errs []NotificationPeerErr
|
errs []NotificationPeerErr
|
||||||
|
retryCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs,
|
// WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs,
|
||||||
// upon Wait() errors are returned collected from all tasks.
|
// upon Wait() errors are returned collected from all tasks.
|
||||||
func WithNPeers(nerrs int) *NotificationGroup {
|
func WithNPeers(nerrs int) *NotificationGroup {
|
||||||
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs)}
|
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), retryCount: 3}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithRetries sets the retry count for all function calls from the Go method.
|
||||||
|
func (g *NotificationGroup) WithRetries(retryCount int) *NotificationGroup {
|
||||||
|
if g != nil {
|
||||||
|
g.retryCount = retryCount
|
||||||
|
}
|
||||||
|
return g
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait blocks until all function calls from the Go method have returned, then
|
// Wait blocks until all function calls from the Go method have returned, then
|
||||||
|
@ -85,17 +94,17 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a
|
||||||
g.errs[index] = NotificationPeerErr{
|
g.errs[index] = NotificationPeerErr{
|
||||||
Host: addr,
|
Host: addr,
|
||||||
}
|
}
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < g.retryCount; i++ {
|
||||||
if err := f(); err != nil {
|
if err := f(); err != nil {
|
||||||
g.errs[index].Err = err
|
g.errs[index].Err = err
|
||||||
// Last iteration log the error.
|
// Last iteration log the error.
|
||||||
if i == 2 {
|
if i == g.retryCount-1 {
|
||||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
|
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
|
||||||
ctx := logger.SetReqInfo(ctx, reqInfo)
|
ctx := logger.SetReqInfo(ctx, reqInfo)
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
}
|
}
|
||||||
// Wait for one second and no need wait after last attempt.
|
// Wait for one second and no need wait after last attempt.
|
||||||
if i < 2 {
|
if i < g.retryCount-1 {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
@ -107,7 +116,7 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a
|
||||||
|
|
||||||
// DeletePolicy - deletes policy across all peers.
|
// DeletePolicy - deletes policy across all peers.
|
||||||
func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr {
|
func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -122,7 +131,7 @@ func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerEr
|
||||||
|
|
||||||
// LoadPolicy - reloads a specific modified policy across all peers
|
// LoadPolicy - reloads a specific modified policy across all peers
|
||||||
func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr {
|
func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -137,7 +146,7 @@ func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr
|
||||||
|
|
||||||
// LoadPolicyMapping - reloads a policy mapping across all peers
|
// LoadPolicyMapping - reloads a policy mapping across all peers
|
||||||
func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr {
|
func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -152,7 +161,7 @@ func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUs
|
||||||
|
|
||||||
// DeleteUser - deletes a specific user across all peers
|
// DeleteUser - deletes a specific user across all peers
|
||||||
func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr {
|
func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -167,7 +176,7 @@ func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr {
|
||||||
|
|
||||||
// LoadUser - reloads a specific user across all peers
|
// LoadUser - reloads a specific user across all peers
|
||||||
func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr {
|
func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -182,7 +191,7 @@ func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []Notification
|
||||||
|
|
||||||
// LoadGroup - loads a specific group on all peers.
|
// LoadGroup - loads a specific group on all peers.
|
||||||
func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
|
func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -195,7 +204,7 @@ func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
|
||||||
|
|
||||||
// DeleteServiceAccount - deletes a specific service account across all peers
|
// DeleteServiceAccount - deletes a specific service account across all peers
|
||||||
func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr {
|
func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -210,7 +219,7 @@ func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []Notificatio
|
||||||
|
|
||||||
// LoadServiceAccount - reloads a specific service account across all peers
|
// LoadServiceAccount - reloads a specific service account across all peers
|
||||||
func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr {
|
func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -468,7 +477,7 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
|
||||||
|
|
||||||
// GetClusterAllBucketStats - returns bucket stats for all buckets from all remote peers.
|
// GetClusterAllBucketStats - returns bucket stats for all buckets from all remote peers.
|
||||||
func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []BucketStatsMap {
|
func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []BucketStatsMap {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
replicationStats := make([]BucketStatsMap, len(sys.peerClients))
|
replicationStats := make([]BucketStatsMap, len(sys.peerClients))
|
||||||
for index, client := range sys.peerClients {
|
for index, client := range sys.peerClients {
|
||||||
index := index
|
index := index
|
||||||
|
@ -509,7 +518,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck
|
||||||
|
|
||||||
// GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view.
|
// GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view.
|
||||||
func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats {
|
func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||||
bucketStats := make([]BucketStats, len(sys.peerClients))
|
bucketStats := make([]BucketStats, len(sys.peerClients))
|
||||||
for index, client := range sys.peerClients {
|
for index, client := range sys.peerClients {
|
||||||
index := index
|
index := index
|
||||||
|
|
|
@ -57,6 +57,10 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea
|
||||||
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||||
// after verifying format.json
|
// after verifying format.json
|
||||||
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||||
|
if client == nil || !client.IsOnline() {
|
||||||
|
return nil, errPeerNotReachable
|
||||||
|
}
|
||||||
|
|
||||||
if values == nil {
|
if values == nil {
|
||||||
values = make(url.Values)
|
values = make(url.Values)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue