Add support profile deadlines and concurrent operations (#20244)

* Allow a maximum of 10 seconds to start profiling operations.
* Download up to 16 profiles concurrently, but only allow 10 seconds for
  each (does not include write time).
* Add cluster info as the first operation.
* Ignore remote download errors.
* Stop remote profiles if the request is terminated.
This commit is contained in:
Klaus Post 2024-08-15 03:36:00 -07:00 committed by GitHub
parent b508264ac4
commit d96798ae7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 55 additions and 37 deletions

View File

@ -1036,7 +1036,7 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R
// Start profiling on remote servers. // Start profiling on remote servers.
var hostErrs []NotificationPeerErr var hostErrs []NotificationPeerErr
for _, profiler := range profiles { for _, profiler := range profiles {
hostErrs = append(hostErrs, globalNotificationSys.StartProfiling(profiler)...) hostErrs = append(hostErrs, globalNotificationSys.StartProfiling(ctx, profiler)...)
// Start profiling locally as well. // Start profiling locally as well.
prof, err := startProfiler(profiler) prof, err := startProfiler(profiler)
@ -1117,7 +1117,11 @@ func (a adminAPIHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request)
// Start profiling on remote servers. // Start profiling on remote servers.
for _, profiler := range profiles { for _, profiler := range profiles {
globalNotificationSys.StartProfiling(profiler) // Limit start time to max 10s.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
globalNotificationSys.StartProfiling(ctx, profiler)
// StartProfiling blocks, so we can cancel now.
cancel()
// Start profiling locally as well. // Start profiling locally as well.
prof, err := startProfiler(profiler) prof, err := startProfiler(profiler)
@ -1132,6 +1136,10 @@ func (a adminAPIHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// Stop remote profiles
go globalNotificationSys.DownloadProfilingData(GlobalContext, io.Discard)
// Stop local
globalProfilerMu.Lock() globalProfilerMu.Lock()
defer globalProfilerMu.Unlock() defer globalProfilerMu.Unlock()
for k, v := range globalProfiler { for k, v := range globalProfiler {

View File

@ -84,6 +84,9 @@ func WithNPeersThrottled(nerrs, wks int) *NotificationGroup {
if nerrs <= 0 { if nerrs <= 0 {
nerrs = 1 nerrs = 1
} }
if wks > nerrs {
wks = nerrs
}
wk, _ := workers.New(wks) wk, _ := workers.New(wks)
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), workers: wk, retryCount: 3} return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), workers: wk, retryCount: 3}
} }
@ -292,15 +295,15 @@ func (sys *NotificationSys) BackgroundHealStatus(ctx context.Context) ([]madmin.
} }
// StartProfiling - start profiling on remote peers, by initiating a remote RPC. // StartProfiling - start profiling on remote peers, by initiating a remote RPC.
func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr { func (sys *NotificationSys) StartProfiling(ctx context.Context, profiler string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)) ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients { for idx, client := range sys.peerClients {
if client == nil { if client == nil {
continue continue
} }
client := client client := client
ng.Go(GlobalContext, func() error { ng.Go(ctx, func() error {
return client.StartProfiling(profiler) return client.StartProfiling(ctx, profiler)
}, idx, *client.host) }, idx, *client.host)
} }
return ng.Wait() return ng.Wait()
@ -313,28 +316,49 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
zipWriter := zip.NewWriter(writer) zipWriter := zip.NewWriter(writer)
defer zipWriter.Close() defer zipWriter.Close()
for _, client := range sys.peerClients { // Start by embedding cluster info.
if b := getClusterMetaInfo(ctx); len(b) > 0 {
internalLogIf(ctx, embedFileInZip(zipWriter, "cluster.info", b, 0o600))
}
// Profiles can be quite big, so we limit to max 16 concurrent downloads.
ng := WithNPeersThrottled(len(sys.peerClients), 16)
var writeMu sync.Mutex
for i, client := range sys.peerClients {
if client == nil { if client == nil {
continue continue
} }
data, err := client.DownloadProfileData() ng.Go(ctx, func() error {
if err != nil { // Give 15 seconds to each remote call.
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) // Errors are logged but not returned.
ctx := logger.SetReqInfo(ctx, reqInfo) ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
peersLogOnceIf(ctx, err, client.host.String()) defer cancel()
continue data, err := client.DownloadProfileData(ctx)
}
profilingDataFound = true
for typ, data := range data {
err := embedFileInZip(zipWriter, fmt.Sprintf("profile-%s-%s", client.host.String(), typ), data, 0o600)
if err != nil { if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo) ctx := logger.SetReqInfo(ctx, reqInfo)
peersLogOnceIf(ctx, err, client.host.String()) peersLogOnceIf(ctx, err, client.host.String())
return nil
} }
}
for typ, data := range data {
// zip writer only handles one concurrent write
writeMu.Lock()
profilingDataFound = true
err := embedFileInZip(zipWriter, fmt.Sprintf("profile-%s-%s", client.host.String(), typ), data, 0o600)
writeMu.Unlock()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
peersLogOnceIf(ctx, err, client.host.String())
}
}
return nil
}, i, *client.host)
}
ng.Wait()
if ctx.Err() != nil {
return false
} }
// Local host // Local host
@ -359,9 +383,6 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
err := embedFileInZip(zipWriter, fmt.Sprintf("profile-%s-%s", thisAddr, typ), data, 0o600) err := embedFileInZip(zipWriter, fmt.Sprintf("profile-%s-%s", thisAddr, typ), data, 0o600)
internalLogIf(ctx, err) internalLogIf(ctx, err)
} }
if b := getClusterMetaInfo(ctx); len(b) > 0 {
internalLogIf(ctx, embedFileInZip(zipWriter, "cluster.info", b, 0o600))
}
return return
} }
@ -383,10 +404,6 @@ func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256
// further discussion advised. Remove this comment and remove the worker model // further discussion advised. Remove this comment and remove the worker model
// for this function in future. // for this function in future.
maxWorkers := runtime.GOMAXPROCS(0) / 2 maxWorkers := runtime.GOMAXPROCS(0) / 2
if maxWorkers > len(sys.peerClients) {
maxWorkers = len(sys.peerClients)
}
ng := WithNPeersThrottled(len(sys.peerClients), maxWorkers) ng := WithNPeersThrottled(len(sys.peerClients), maxWorkers)
for idx, client := range sys.peerClients { for idx, client := range sys.peerClients {
if client == nil { if client == nil {

View File

@ -105,13 +105,6 @@ func newPeerRESTClient(peer *xnet.Host, gridHost string) *peerRESTClient {
} }
} }
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
return client.callWithContext(GlobalContext, method, values, body, length)
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json // after verifying format.json
@ -257,10 +250,10 @@ func (client *peerRESTClient) GetProcInfo(ctx context.Context) (info madmin.Proc
} }
// StartProfiling - Issues profiling command on the peer node. // StartProfiling - Issues profiling command on the peer node.
func (client *peerRESTClient) StartProfiling(profiler string) error { func (client *peerRESTClient) StartProfiling(ctx context.Context, profiler string) error {
values := make(url.Values) values := make(url.Values)
values.Set(peerRESTProfiler, profiler) values.Set(peerRESTProfiler, profiler)
respBody, err := client.call(peerRESTMethodStartProfiling, values, nil, -1) respBody, err := client.callWithContext(ctx, peerRESTMethodStartProfiling, values, nil, -1)
if err != nil { if err != nil {
return err return err
} }
@ -269,8 +262,8 @@ func (client *peerRESTClient) StartProfiling(profiler string) error {
} }
// DownloadProfileData - download profiled data from a remote node. // DownloadProfileData - download profiled data from a remote node.
func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err error) { func (client *peerRESTClient) DownloadProfileData(ctx context.Context) (data map[string][]byte, err error) {
respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1) respBody, err := client.callWithContext(ctx, peerRESTMethodDownloadProfilingData, nil, nil, -1)
if err != nil { if err != nil {
return return
} }