diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 82ba40a83..439bf86a5 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1431,7 +1431,7 @@ func getAggregatedBackgroundHealState(ctx context.Context, o ObjectLayer) (madmi if globalIsDistErasure { // Get heal status from other peers - peersHealStates, nerrs := globalNotificationSys.BackgroundHealStatus() + peersHealStates, nerrs := globalNotificationSys.BackgroundHealStatus(ctx) var errCount int for _, nerr := range nerrs { if nerr.Err != nil { @@ -2347,7 +2347,7 @@ func getServerInfo(ctx context.Context, pools, metrics bool, r *http.Request) ma notifyTarget := fetchLambdaInfo() local := getLocalServerProperty(globalEndpoints, r, metrics) - servers := globalNotificationSys.ServerInfo(metrics) + servers := globalNotificationSys.ServerInfo(ctx, metrics) servers = append(servers, local) var poolsInfo map[int]map[int]madmin.ErasureSetInfo diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 2e627d572..91384f51e 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -700,7 +700,7 @@ func (z *erasureServerPools) LocalStorageInfo(ctx context.Context, metrics bool) } func (z *erasureServerPools) StorageInfo(ctx context.Context, metrics bool) StorageInfo { - return globalNotificationSys.StorageInfo(z, metrics) + return globalNotificationSys.StorageInfo(ctx, z, metrics) } func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error { diff --git a/cmd/iam.go b/cmd/iam.go index c4bb4b0d2..ae33d659f 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -572,7 +572,7 @@ func (sys *IAMSys) DeletePolicy(ctx context.Context, policyName string, notifyPe } // Notify all other MinIO peers to delete policy - for _, nerr := range globalNotificationSys.DeletePolicy(policyName) { + for _, nerr := range globalNotificationSys.DeletePolicy(ctx, policyName) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -637,7 +637,7 @@ func (sys *IAMSys) SetPolicy(ctx context.Context, policyName string, p policy.Po if !sys.HasWatcher() { // Notify all other MinIO peers to reload policy - for _, nerr := range globalNotificationSys.LoadPolicy(policyName) { + for _, nerr := range globalNotificationSys.LoadPolicy(ctx, policyName) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -659,7 +659,7 @@ func (sys *IAMSys) DeleteUser(ctx context.Context, accessKey string, notifyPeers // Notify all other MinIO peers to delete user. if notifyPeers && !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.DeleteUser(accessKey) { + for _, nerr := range globalNotificationSys.DeleteUser(ctx, accessKey) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -685,7 +685,7 @@ func (sys *IAMSys) CurrentPolicies(policyName string) string { func (sys *IAMSys) notifyForUser(ctx context.Context, accessKey string, isTemp bool) { // Notify all other MinIO peers to reload user. if !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.LoadUser(accessKey, isTemp) { + for _, nerr := range globalNotificationSys.LoadUser(ctx, accessKey, isTemp) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -930,7 +930,7 @@ func (sys *IAMSys) SetUserStatus(ctx context.Context, accessKey string, status m func (sys *IAMSys) notifyForServiceAccount(ctx context.Context, accessKey string) { // Notify all other Minio peers to reload the service account if !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.LoadServiceAccount(accessKey) { + for _, nerr := range globalNotificationSys.LoadServiceAccount(ctx, accessKey) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -1251,7 +1251,7 @@ func (sys *IAMSys) DeleteServiceAccount(ctx context.Context, accessKey string, n } if notifyPeers && !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.DeleteServiceAccount(accessKey) { + for _, nerr := range globalNotificationSys.DeleteServiceAccount(ctx, accessKey) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -1745,7 +1745,7 @@ func (sys *IAMSys) GetUser(ctx context.Context, accessKey string) (u UserIdentit // Notify all other MinIO peers to load group. func (sys *IAMSys) notifyForGroup(ctx context.Context, group string) { if !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.LoadGroup(group) { + for _, nerr := range globalNotificationSys.LoadGroup(ctx, group) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -1847,7 +1847,7 @@ func (sys *IAMSys) PolicyDBSet(ctx context.Context, name, policy string, userTyp // Notify all other MinIO peers to reload policy if !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.LoadPolicyMapping(name, userType, isGroup) { + for _, nerr := range globalNotificationSys.LoadPolicyMapping(ctx, name, userType, isGroup) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -1915,7 +1915,7 @@ func (sys *IAMSys) PolicyDBUpdateBuiltin(ctx context.Context, isAttach bool, // Notify all other MinIO peers to reload policy if !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.LoadPolicyMapping(userOrGroup, regUser, isGroup) { + for _, nerr := range globalNotificationSys.LoadPolicyMapping(ctx, userOrGroup, regUser, isGroup) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) @@ -2007,7 +2007,7 @@ func (sys *IAMSys) PolicyDBUpdateLDAP(ctx context.Context, isAttach bool, // Notify all other MinIO peers to reload policy if !sys.HasWatcher() { - for _, nerr := range globalNotificationSys.LoadPolicyMapping(dn, userType, isGroup) { + for _, nerr := range globalNotificationSys.LoadPolicyMapping(ctx, dn, userType, isGroup) { if nerr.Err != nil { logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) iamLogIf(ctx, nerr.Err) diff --git a/cmd/logging.go b/cmd/logging.go index f23e88656..b98e1be20 100644 --- a/cmd/logging.go +++ b/cmd/logging.go @@ -2,7 +2,9 @@ package cmd import ( "context" + "errors" + "github.com/minio/minio/internal/grid" "github.com/minio/minio/internal/logger" ) @@ -43,15 +45,21 @@ func authZLogIf(ctx context.Context, err error, errKind ...interface{}) { } func peersLogIf(ctx context.Context, err error, errKind ...interface{}) { - logger.LogIf(ctx, "peers", err, errKind...) + if !errors.Is(err, grid.ErrDisconnected) { + logger.LogIf(ctx, "peers", err, errKind...) + } } func peersLogAlwaysIf(ctx context.Context, err error, errKind ...interface{}) { - logger.LogAlwaysIf(ctx, "peers", err, errKind...) + if !errors.Is(err, grid.ErrDisconnected) { + logger.LogAlwaysIf(ctx, "peers", err, errKind...) + } } func peersLogOnceIf(ctx context.Context, err error, id string, errKind ...interface{}) { - logger.LogOnceIf(ctx, "peers", err, id, errKind...) + if !errors.Is(err, grid.ErrDisconnected) { + logger.LogOnceIf(ctx, "peers", err, id, errKind...) + } } func bugLogIf(ctx context.Context, err error, errKind ...interface{}) { diff --git a/cmd/notification.go b/cmd/notification.go index a6bb014a6..96541e929 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -118,21 +118,30 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a g.errs[index] = NotificationPeerErr{ Host: addr, } - for i := 0; i < g.retryCount; i++ { + + retryCount := g.retryCount + for i := 0; i < retryCount; i++ { g.errs[index].Err = nil if err := f(); err != nil { g.errs[index].Err = err + + if contextCanceled(ctx) { + // context already canceled no retries. + retryCount = 0 + } + // Last iteration log the error. - if i == g.retryCount-1 { + if i == retryCount-1 { reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) ctx := logger.SetReqInfo(ctx, reqInfo) peersLogOnceIf(ctx, err, addr.String()) } + // Wait for a minimum of 100ms and dynamically increase this based on number of attempts. - if i < g.retryCount-1 { + if i < retryCount-1 { time.Sleep(100*time.Millisecond + time.Duration(r.Float64()*float64(time.Second))) + continue } - continue } break } @@ -140,137 +149,137 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a } // DeletePolicy - deletes policy across all peers. -func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr { +func (sys *NotificationSys) DeletePolicy(ctx context.Context, policyName string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.DeletePolicy(policyName) + return client.DeletePolicy(ctx, policyName) }, idx, *client.host) } return ng.Wait() } // LoadPolicy - reloads a specific modified policy across all peers -func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr { +func (sys *NotificationSys) LoadPolicy(ctx context.Context, policyName string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.LoadPolicy(policyName) + return client.LoadPolicy(ctx, policyName) }, idx, *client.host) } return ng.Wait() } // LoadPolicyMapping - reloads a policy mapping across all peers -func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr { +func (sys *NotificationSys) LoadPolicyMapping(ctx context.Context, userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.LoadPolicyMapping(userOrGroup, userType, isGroup) + return client.LoadPolicyMapping(ctx, userOrGroup, userType, isGroup) }, idx, *client.host) } return ng.Wait() } // DeleteUser - deletes a specific user across all peers -func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr { +func (sys *NotificationSys) DeleteUser(ctx context.Context, accessKey string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.DeleteUser(accessKey) + return client.DeleteUser(ctx, accessKey) }, idx, *client.host) } return ng.Wait() } // LoadUser - reloads a specific user across all peers -func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr { +func (sys *NotificationSys) LoadUser(ctx context.Context, accessKey string, temp bool) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.LoadUser(accessKey, temp) + return client.LoadUser(ctx, accessKey, temp) }, idx, *client.host) } return ng.Wait() } // LoadGroup - loads a specific group on all peers. -func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr { +func (sys *NotificationSys) LoadGroup(ctx context.Context, group string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.LoadGroup(group) + return client.LoadGroup(ctx, group) }, idx, *client.host) } return ng.Wait() } // DeleteServiceAccount - deletes a specific service account across all peers -func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr { +func (sys *NotificationSys) DeleteServiceAccount(ctx context.Context, accessKey string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.DeleteServiceAccount(accessKey) + return client.DeleteServiceAccount(ctx, accessKey) }, idx, *client.host) } return ng.Wait() } // LoadServiceAccount - reloads a specific service account across all peers -func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr { +func (sys *NotificationSys) LoadServiceAccount(ctx context.Context, accessKey string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)).WithRetries(1) for idx, client := range sys.peerClients { client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - return client.LoadServiceAccount(accessKey) + return client.LoadServiceAccount(ctx, accessKey) }, idx, *client.host) } return ng.Wait() } // BackgroundHealStatus - returns background heal status of all peers -func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []NotificationPeerErr) { +func (sys *NotificationSys) BackgroundHealStatus(ctx context.Context) ([]madmin.BgHealState, []NotificationPeerErr) { ng := WithNPeers(len(sys.peerClients)) states := make([]madmin.BgHealState, len(sys.peerClients)) for idx, client := range sys.peerClients { idx := idx client := client - ng.Go(GlobalContext, func() error { + ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable } - st, err := client.BackgroundHealStatus() + st, err := client.BackgroundHealStatus(ctx) if err != nil { return err } @@ -312,7 +321,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io if err != nil { reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) - peersLogIf(ctx, err) + peersLogOnceIf(ctx, err, client.host.String()) continue } @@ -323,7 +332,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io if err != nil { reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) - peersLogIf(ctx, err) + peersLogOnceIf(ctx, err, client.host.String()) } } } @@ -465,7 +474,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe if client == nil { return errPeerNotReachable } - serverLocksResp, err := sys.peerClients[index].GetLocks() + serverLocksResp, err := sys.peerClients[index].GetLocks(ctx) if err != nil { return err } @@ -498,7 +507,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s } client := client ng.Go(ctx, func() error { - return client.LoadBucketMetadata(bucketName) + return client.LoadBucketMetadata(ctx, bucketName) }, idx, *client.host) } for _, nErr := range ng.Wait() { @@ -528,7 +537,7 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName } client := client ng.Go(ctx, func() error { - return client.DeleteBucketMetadata(bucketName) + return client.DeleteBucketMetadata(ctx, bucketName) }, idx, *client.host) } for _, nErr := range ng.Wait() { @@ -550,7 +559,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck if client == nil { return errPeerNotReachable } - bsMap, err := client.GetAllBucketStats() + bsMap, err := client.GetAllBucketStats(ctx) if err != nil { return err } @@ -592,7 +601,7 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam if client == nil { return errPeerNotReachable } - bs, err := client.GetBucketStats(bucketName) + bs, err := client.GetBucketStats(ctx, bucketName) if err != nil { return err } @@ -625,7 +634,7 @@ func (sys *NotificationSys) GetClusterSiteMetrics(ctx context.Context) []SRMetri if client == nil { return errPeerNotReachable } - sm, err := client.GetSRMetrics() + sm, err := client.GetSRMetrics(ctx) if err != nil { return err } @@ -666,6 +675,12 @@ func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) { // StopRebalance notifies all MinIO nodes to signal any ongoing rebalance // goroutine to stop. func (sys *NotificationSys) StopRebalance(ctx context.Context) { + objAPI := newObjectLayerFn() + if objAPI == nil { + internalLogIf(ctx, errServerNotInitialized) + return + } + ng := WithNPeers(len(sys.peerClients)) for idx, client := range sys.peerClients { if client == nil { @@ -683,12 +698,6 @@ func (sys *NotificationSys) StopRebalance(ctx context.Context) { } } - objAPI := newObjectLayerFn() - if objAPI == nil { - internalLogIf(ctx, errServerNotInitialized) - return - } - if pools, ok := objAPI.(*erasureServerPools); ok { pools.StopRebalance() } @@ -1047,7 +1056,7 @@ func getOfflineDisks(offlineHost string, endpoints EndpointServerPools) []madmin } // StorageInfo returns disk information across all peers -func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) StorageInfo { +func (sys *NotificationSys) StorageInfo(ctx context.Context, objLayer ObjectLayer, metrics bool) StorageInfo { var storageInfo StorageInfo replies := make([]StorageInfo, len(sys.peerClients)) @@ -1059,7 +1068,7 @@ func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) Stor wg.Add(1) go func(client *peerRESTClient, idx int) { defer wg.Done() - info, err := client.LocalStorageInfo(metrics) + info, err := client.LocalStorageInfo(ctx, metrics) if err != nil { info.Disks = getOfflineDisks(client.host.String(), globalEndpoints) } @@ -1069,7 +1078,7 @@ func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) Stor wg.Wait() // Add local to this server. - replies = append(replies, objLayer.LocalStorageInfo(GlobalContext, metrics)) + replies = append(replies, objLayer.LocalStorageInfo(ctx, metrics)) storageInfo.Backend = objLayer.BackendInfo() for _, sinfo := range replies { @@ -1080,7 +1089,7 @@ func (sys *NotificationSys) StorageInfo(objLayer ObjectLayer, metrics bool) Stor } // ServerInfo - calls ServerInfo RPC call on all peers. -func (sys *NotificationSys) ServerInfo(metrics bool) []madmin.ServerProperties { +func (sys *NotificationSys) ServerInfo(ctx context.Context, metrics bool) []madmin.ServerProperties { reply := make([]madmin.ServerProperties, len(sys.peerClients)) var wg sync.WaitGroup for i, client := range sys.peerClients { @@ -1090,7 +1099,7 @@ func (sys *NotificationSys) ServerInfo(metrics bool) []madmin.ServerProperties { wg.Add(1) go func(client *peerRESTClient, idx int) { defer wg.Done() - info, err := client.ServerInfo(metrics) + info, err := client.ServerInfo(ctx, metrics) if err != nil { info.Endpoint = client.host.String() info.State = string(madmin.ItemOffline) diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index e04cb2dd0..62806ff31 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -705,6 +705,10 @@ func (e UnsupportedMetadata) Error() string { // isErrBucketNotFound - Check if error type is BucketNotFound. func isErrBucketNotFound(err error) bool { + if errors.Is(err, errVolumeNotFound) { + return true + } + var bkNotFound BucketNotFound return errors.As(err, &bkNotFound) } @@ -723,12 +727,20 @@ func isErrWriteQuorum(err error) bool { // isErrObjectNotFound - Check if error type is ObjectNotFound. func isErrObjectNotFound(err error) bool { + if errors.Is(err, errFileNotFound) { + return true + } + var objNotFound ObjectNotFound return errors.As(err, &objNotFound) } // isErrVersionNotFound - Check if error type is VersionNotFound. func isErrVersionNotFound(err error) bool { + if errors.Is(err, errFileVersionNotFound) { + return true + } + var versionNotFound VersionNotFound return errors.As(err, &versionNotFound) } diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 206aefd1d..4a888484c 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -157,8 +157,8 @@ func (client *peerRESTClient) Close() error { } // GetLocks - fetch older locks for a remote node. -func (client *peerRESTClient) GetLocks() (lockMap map[string][]lockRequesterInfo, err error) { - resp, err := getLocksRPC.Call(context.Background(), client.gridConn(), grid.NewMSS()) +func (client *peerRESTClient) GetLocks(ctx context.Context) (lockMap map[string][]lockRequesterInfo, err error) { + resp, err := getLocksRPC.Call(ctx, client.gridConn(), grid.NewMSS()) if err != nil || resp == nil { return nil, err } @@ -166,16 +166,16 @@ func (client *peerRESTClient) GetLocks() (lockMap map[string][]lockRequesterInfo } // LocalStorageInfo - fetch server information for a remote node. -func (client *peerRESTClient) LocalStorageInfo(metrics bool) (info StorageInfo, err error) { - resp, err := localStorageInfoRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) LocalStorageInfo(ctx context.Context, metrics bool) (info StorageInfo, err error) { + resp, err := localStorageInfoRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTMetrics: strconv.FormatBool(metrics), })) return resp.ValueOrZero(), err } // ServerInfo - fetch server information for a remote node. -func (client *peerRESTClient) ServerInfo(metrics bool) (info madmin.ServerProperties, err error) { - resp, err := serverInfoRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{peerRESTMetrics: strconv.FormatBool(metrics)})) +func (client *peerRESTClient) ServerInfo(ctx context.Context, metrics bool) (info madmin.ServerProperties, err error) { + resp, err := serverInfoRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{peerRESTMetrics: strconv.FormatBool(metrics)})) return resp.ValueOrZero(), err } @@ -280,8 +280,8 @@ func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err } // GetBucketStats - load bucket statistics -func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error) { - resp, err := getBucketStatsRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) GetBucketStats(ctx context.Context, bucket string) (BucketStats, error) { + resp, err := getBucketStatsRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTBucket: bucket, })) if err != nil || resp == nil { @@ -291,8 +291,8 @@ func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error) } // GetSRMetrics loads site replication metrics, optionally for a specific bucket -func (client *peerRESTClient) GetSRMetrics() (SRMetricsSummary, error) { - resp, err := getSRMetricsRPC.Call(context.Background(), client.gridConn(), grid.NewMSS()) +func (client *peerRESTClient) GetSRMetrics(ctx context.Context) (SRMetricsSummary, error) { + resp, err := getSRMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) if err != nil || resp == nil { return SRMetricsSummary{}, err } @@ -300,8 +300,8 @@ func (client *peerRESTClient) GetSRMetrics() (SRMetricsSummary, error) { } // GetAllBucketStats - load replication stats for all buckets -func (client *peerRESTClient) GetAllBucketStats() (BucketStatsMap, error) { - resp, err := getAllBucketStatsRPC.Call(context.Background(), client.gridConn(), grid.NewMSS()) +func (client *peerRESTClient) GetAllBucketStats(ctx context.Context) (BucketStatsMap, error) { + resp, err := getAllBucketStatsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) if err != nil || resp == nil { return BucketStatsMap{}, err } @@ -309,40 +309,40 @@ func (client *peerRESTClient) GetAllBucketStats() (BucketStatsMap, error) { } // LoadBucketMetadata - load bucket metadata -func (client *peerRESTClient) LoadBucketMetadata(bucket string) error { - _, err := loadBucketMetadataRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) LoadBucketMetadata(ctx context.Context, bucket string) error { + _, err := loadBucketMetadataRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTBucket: bucket, })) return err } // DeleteBucketMetadata - Delete bucket metadata -func (client *peerRESTClient) DeleteBucketMetadata(bucket string) error { - _, err := deleteBucketMetadataRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) DeleteBucketMetadata(ctx context.Context, bucket string) error { + _, err := deleteBucketMetadataRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTBucket: bucket, })) return err } // DeletePolicy - delete a specific canned policy. -func (client *peerRESTClient) DeletePolicy(policyName string) (err error) { - _, err = deletePolicyRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) DeletePolicy(ctx context.Context, policyName string) (err error) { + _, err = deletePolicyRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTPolicy: policyName, })) return err } // LoadPolicy - reload a specific canned policy. -func (client *peerRESTClient) LoadPolicy(policyName string) (err error) { - _, err = loadPolicyRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) LoadPolicy(ctx context.Context, policyName string) (err error) { + _, err = loadPolicyRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTPolicy: policyName, })) return err } // LoadPolicyMapping - reload a specific policy mapping -func (client *peerRESTClient) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) error { - _, err := loadPolicyMappingRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) LoadPolicyMapping(ctx context.Context, userOrGroup string, userType IAMUserType, isGroup bool) error { + _, err := loadPolicyMappingRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTUserOrGroup: userOrGroup, peerRESTUserType: strconv.Itoa(int(userType)), peerRESTIsGroup: strconv.FormatBool(isGroup), @@ -351,24 +351,24 @@ func (client *peerRESTClient) LoadPolicyMapping(userOrGroup string, userType IAM } // DeleteUser - delete a specific user. -func (client *peerRESTClient) DeleteUser(accessKey string) (err error) { - _, err = deleteUserRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) DeleteUser(ctx context.Context, accessKey string) (err error) { + _, err = deleteUserRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTUser: accessKey, })) return err } // DeleteServiceAccount - delete a specific service account. -func (client *peerRESTClient) DeleteServiceAccount(accessKey string) (err error) { - _, err = deleteSvcActRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) DeleteServiceAccount(ctx context.Context, accessKey string) (err error) { + _, err = deleteSvcActRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTUser: accessKey, })) return err } // LoadUser - reload a specific user. -func (client *peerRESTClient) LoadUser(accessKey string, temp bool) (err error) { - _, err = loadUserRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) LoadUser(ctx context.Context, accessKey string, temp bool) (err error) { + _, err = loadUserRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTUser: accessKey, peerRESTUserTemp: strconv.FormatBool(temp), })) @@ -376,16 +376,16 @@ func (client *peerRESTClient) LoadUser(accessKey string, temp bool) (err error) } // LoadServiceAccount - reload a specific service account. -func (client *peerRESTClient) LoadServiceAccount(accessKey string) (err error) { - _, err = loadSvcActRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) LoadServiceAccount(ctx context.Context, accessKey string) (err error) { + _, err = loadSvcActRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTUser: accessKey, })) return err } // LoadGroup - send load group command to peers. -func (client *peerRESTClient) LoadGroup(group string) error { - _, err := loadGroupRPC.Call(context.Background(), client.gridConn(), grid.NewMSSWith(map[string]string{ +func (client *peerRESTClient) LoadGroup(ctx context.Context, group string) error { + _, err := loadGroupRPC.Call(ctx, client.gridConn(), grid.NewMSSWith(map[string]string{ peerRESTGroup: group, })) return err @@ -436,8 +436,8 @@ func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string, dr return err } -func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error) { - resp, err := getBackgroundHealStatusRPC.Call(context.Background(), client.gridConn(), grid.NewMSS()) +func (client *peerRESTClient) BackgroundHealStatus(ctx context.Context) (madmin.BgHealState, error) { + resp, err := getBackgroundHealStatusRPC.Call(ctx, client.gridConn(), grid.NewMSS()) return resp.ValueOrZero(), err } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 486b97d85..4f3e4b365 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -38,6 +38,7 @@ import ( "github.com/minio/minio/internal/cachevalue" "github.com/minio/minio/internal/grid" xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/rest" xnet "github.com/minio/pkg/v2/net" @@ -662,6 +663,13 @@ func (client *storageRESTClient) ListDir(ctx context.Context, origvolume, volume // DeleteFile - deletes a file. func (client *storageRESTClient) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) error { + if !deleteOpts.Immediate { + // add deadlines for all non-immediate purges + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + } + _, err := storageDeleteFileRPC.Call(ctx, client.gridConn, &DeleteFileHandlerParams{ DiskID: *client.diskID.Load(), Volume: volume, @@ -727,6 +735,9 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri // RenameFile - renames a file. func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { + ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + _, err = storageRenameFileRPC.Call(ctx, client.gridConn, &RenameFileHandlerParams{ DiskID: *client.diskID.Load(), SrcVolume: srcVolume, @@ -782,6 +793,7 @@ func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path } rd := msgpNewReader(respReader) defer readMsgpReaderPoolPut(rd) + for { var st StatInfo err = st.DecodeMsg(rd) @@ -791,6 +803,7 @@ func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path } break } + stat = append(stat, st) } @@ -815,7 +828,7 @@ func (client *storageRESTClient) ReadMultiple(ctx context.Context, req ReadMulti pr, pw := io.Pipe() go func() { - pw.CloseWithError(waitForHTTPStream(respBody, pw)) + pw.CloseWithError(waitForHTTPStream(respBody, ioutil.NewDeadlineWriter(pw, globalDriveConfig.GetMaxTimeout()))) }() mr := msgp.NewReader(pr) defer readMsgpReaderPoolPut(mr) @@ -868,7 +881,6 @@ func newStorageRESTClient(endpoint Endpoint, healthCheck bool, gm *grid.Manager) } restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) - if healthCheck { // Use a separate client to avoid recursive calls. healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) diff --git a/internal/grid/manager.go b/internal/grid/manager.go index 94a00062b..58cc82429 100644 --- a/internal/grid/manager.go +++ b/internal/grid/manager.go @@ -20,7 +20,9 @@ package grid import ( "context" "crypto/tls" + "errors" "fmt" + "io" "net/http" "runtime/debug" "strings" @@ -150,7 +152,7 @@ func (m *Manager) Handler() http.HandlerFunc { } ctx := req.Context() if err := m.authRequest(req); err != nil { - gridLogOnceIf(ctx, fmt.Errorf("auth %s: %w", req.RemoteAddr, err), req.RemoteAddr+err.Error()) + gridLogOnceIf(ctx, fmt.Errorf("auth %s: %w", req.RemoteAddr, err), req.RemoteAddr) w.WriteHeader(http.StatusForbidden) return } @@ -167,7 +169,10 @@ func (m *Manager) Handler() http.HandlerFunc { if err == nil { return } - gridLogOnceIf(ctx, err, err.Error()) + if errors.Is(err, io.EOF) { + return + } + gridLogOnceIf(ctx, err, req.RemoteAddr) resp := connectResp{ ID: m.ID, Accepted: false,