mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
add more deadlines and pass around context under most situations (#19752)
This commit is contained in:
@@ -118,21 +118,30 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a
|
||||
g.errs[index] = NotificationPeerErr{
|
||||
Host: addr,
|
||||
}
|
||||
for i := 0; i < g.retryCount; i++ {
|
||||
|
||||
retryCount := g.retryCount
|
||||
for i := 0; i < retryCount; i++ {
|
||||
g.errs[index].Err = nil
|
||||
if err := f(); err != nil {
|
||||
g.errs[index].Err = err
|
||||
|
||||
if contextCanceled(ctx) {
|
||||
// context already canceled no retries.
|
||||
retryCount = 0
|
||||
}
|
||||
|
||||
// Last iteration log the error.
|
||||
if i == g.retryCount-1 {
|
||||
if i == retryCount-1 {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
|
||||
ctx := logger.SetReqInfo(ctx, reqInfo)
|
||||
peersLogOnceIf(ctx, err, addr.String())
|
||||
}
|
||||
|
||||
// Wait for a minimum of 100ms and dynamically increase this based on number of attempts.
|
||||
if i < g.retryCount-1 {
|
||||
if i < retryCount-1 {
|
||||
time.Sleep(100*time.Millisecond + time.Duration(r.Float64()*float64(time.Second)))
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
@@ -140,137 +149,137 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a
|
||||
}
|
||||
|
||||
// DeletePolicy - deletes policy across all peers.
|
||||
func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) DeletePolicy(ctx context.Context, policyName string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.DeletePolicy(policyName)
|
||||
return client.DeletePolicy(ctx, policyName)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// LoadPolicy - reloads a specific modified policy across all peers
|
||||
func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) LoadPolicy(ctx context.Context, policyName string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.LoadPolicy(policyName)
|
||||
return client.LoadPolicy(ctx, policyName)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// LoadPolicyMapping - reloads a policy mapping across all peers
|
||||
func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) LoadPolicyMapping(ctx context.Context, userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.LoadPolicyMapping(userOrGroup, userType, isGroup)
|
||||
return client.LoadPolicyMapping(ctx, userOrGroup, userType, isGroup)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// DeleteUser - deletes a specific user across all peers
|
||||
func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) DeleteUser(ctx context.Context, accessKey string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.DeleteUser(accessKey)
|
||||
return client.DeleteUser(ctx, accessKey)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// LoadUser - reloads a specific user across all peers
|
||||
func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) LoadUser(ctx context.Context, accessKey string, temp bool) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.LoadUser(accessKey, temp)
|
||||
return client.LoadUser(ctx, accessKey, temp)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// LoadGroup - loads a specific group on all peers.
|
||||
func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) LoadGroup(ctx context.Context, group string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.LoadGroup(group)
|
||||
return client.LoadGroup(ctx, group)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// DeleteServiceAccount - deletes a specific service account across all peers
|
||||
func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) DeleteServiceAccount(ctx context.Context, accessKey string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.DeleteServiceAccount(accessKey)
|
||||
return client.DeleteServiceAccount(ctx, accessKey)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// LoadServiceAccount - reloads a specific service account across all peers
|
||||
func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr {
|
||||
func (sys *NotificationSys) LoadServiceAccount(ctx context.Context, accessKey string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
return client.LoadServiceAccount(accessKey)
|
||||
return client.LoadServiceAccount(ctx, accessKey)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// BackgroundHealStatus - returns background heal status of all peers
|
||||
func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []NotificationPeerErr) {
|
||||
func (sys *NotificationSys) BackgroundHealStatus(ctx context.Context) ([]madmin.BgHealState, []NotificationPeerErr) {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
states := make([]madmin.BgHealState, len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
idx := idx
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
ng.Go(ctx, func() error {
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
st, err := client.BackgroundHealStatus()
|
||||
st, err := client.BackgroundHealStatus(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -312,7 +321,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
|
||||
ctx := logger.SetReqInfo(ctx, reqInfo)
|
||||
peersLogIf(ctx, err)
|
||||
peersLogOnceIf(ctx, err, client.host.String())
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -323,7 +332,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
|
||||
ctx := logger.SetReqInfo(ctx, reqInfo)
|
||||
peersLogIf(ctx, err)
|
||||
peersLogOnceIf(ctx, err, client.host.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -465,7 +474,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
serverLocksResp, err := sys.peerClients[index].GetLocks()
|
||||
serverLocksResp, err := sys.peerClients[index].GetLocks(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -498,7 +507,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s
|
||||
}
|
||||
client := client
|
||||
ng.Go(ctx, func() error {
|
||||
return client.LoadBucketMetadata(bucketName)
|
||||
return client.LoadBucketMetadata(ctx, bucketName)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
for _, nErr := range ng.Wait() {
|
||||
@@ -528,7 +537,7 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
|
||||
}
|
||||
client := client
|
||||
ng.Go(ctx, func() error {
|
||||
return client.DeleteBucketMetadata(bucketName)
|
||||
return client.DeleteBucketMetadata(ctx, bucketName)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
for _, nErr := range ng.Wait() {
|
||||
@@ -550,7 +559,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
bsMap, err := client.GetAllBucketStats()
|
||||
bsMap, err := client.GetAllBucketStats(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -592,7 +601,7 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
bs, err := client.GetBucketStats(bucketName)
|
||||
bs, err := client.GetBucketStats(ctx, bucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -625,7 +634,7 @@ func (sys *NotificationSys) GetClusterSiteMetrics(ctx context.Context) []SRMetri
|
||||
if client == nil {
|
||||
return errPeerNotReachable
|
||||
}
|
||||
sm, err := client.GetSRMetrics()
|
||||
sm, err := client.GetSRMetrics(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -666,6 +675,12 @@ func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) {
|
||||
// StopRebalance notifies all MinIO nodes to signal any ongoing rebalance
|
||||
// goroutine to stop.
|
||||
func (sys *NotificationSys) StopRebalance(ctx context.Context) {
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
internalLogIf(ctx, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
@@ -683,12 +698,6 @@ func (sys *NotificationSys) StopRebalance(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
internalLogIf(ctx, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
if pools, ok := objAPI.(*erasureServerPools); ok {
|
||||
pools.StopRebalance()
|
||||
}
|
||||
@@ -1047,7 +1056,7 @@ func getOfflineDisks(offlineHost string, endpoints EndpointServerPools) []madmin
|
||||
}
|
||||
|
||||
// StorageInfo returns disk information across all peers
|
||||
func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) StorageInfo {
|
||||
func (sys *NotificationSys) StorageInfo(ctx context.Context, objLayer ObjectLayer, metrics bool) StorageInfo {
|
||||
var storageInfo StorageInfo
|
||||
replies := make([]StorageInfo, len(sys.peerClients))
|
||||
|
||||
@@ -1059,7 +1068,7 @@ func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) Stor
|
||||
wg.Add(1)
|
||||
go func(client *peerRESTClient, idx int) {
|
||||
defer wg.Done()
|
||||
info, err := client.LocalStorageInfo(metrics)
|
||||
info, err := client.LocalStorageInfo(ctx, metrics)
|
||||
if err != nil {
|
||||
info.Disks = getOfflineDisks(client.host.String(), globalEndpoints)
|
||||
}
|
||||
@@ -1069,7 +1078,7 @@ func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) Stor
|
||||
wg.Wait()
|
||||
|
||||
// Add local to this server.
|
||||
replies = append(replies, objLayer.LocalStorageInfo(GlobalContext, metrics))
|
||||
replies = append(replies, objLayer.LocalStorageInfo(ctx, metrics))
|
||||
|
||||
storageInfo.Backend = objLayer.BackendInfo()
|
||||
for _, sinfo := range replies {
|
||||
@@ -1080,7 +1089,7 @@ func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) Stor
|
||||
}
|
||||
|
||||
// ServerInfo - calls ServerInfo RPC call on all peers.
|
||||
func (sys *NotificationSys) ServerInfo(metrics bool) []madmin.ServerProperties {
|
||||
func (sys *NotificationSys) ServerInfo(ctx context.Context, metrics bool) []madmin.ServerProperties {
|
||||
reply := make([]madmin.ServerProperties, len(sys.peerClients))
|
||||
var wg sync.WaitGroup
|
||||
for i, client := range sys.peerClients {
|
||||
@@ -1090,7 +1099,7 @@ func (sys *NotificationSys) ServerInfo(metrics bool) []madmin.ServerProperties {
|
||||
wg.Add(1)
|
||||
go func(client *peerRESTClient, idx int) {
|
||||
defer wg.Done()
|
||||
info, err := client.ServerInfo(metrics)
|
||||
info, err := client.ServerInfo(ctx, metrics)
|
||||
if err != nil {
|
||||
info.Endpoint = client.host.String()
|
||||
info.State = string(madmin.ItemOffline)
|
||||
|
||||
Reference in New Issue
Block a user