From 2bf6cf0e15b0bea9186f06109b9badb3133173ce Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sat, 11 Jan 2020 02:19:58 +0100 Subject: [PATCH] Enable multiple concurrent profile types (#8792) --- cmd/admin-handlers.go | 36 +++++++--- cmd/notification.go | 93 +++++++++++++------------ cmd/peer-rest-client.go | 2 +- cmd/peer-rest-server.go | 39 +++++++---- cmd/signals.go | 8 ++- cmd/utils.go | 149 +++++++++++++++++++++++++--------------- cmd/utils_test.go | 2 +- go.mod | 1 - 8 files changed, 202 insertions(+), 128 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index f998c7cc9..27f73bbcf 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -571,30 +571,44 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R } vars := mux.Vars(r) - profiler := vars["profilerType"] - + profiles := strings.Split(vars["profilerType"], ",") thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - // Start profiling on remote servers. - hostErrs := globalNotificationSys.StartProfiling(profiler) + globalProfilerMu.Lock() + defer globalProfilerMu.Unlock() - // Start profiling locally as well. - { - if globalProfiler != nil { - globalProfiler.Stop() + if globalProfiler == nil { + globalProfiler = make(map[string]minioProfiler, 10) + } + + // Stop profiler of all types if already running + for k, v := range globalProfiler { + for _, p := range profiles { + if p == k { + v.Stop() + delete(globalProfiler, k) + } } - prof, err := startProfiler(profiler, "") + } + + // Start profiling on remote servers. + var hostErrs []NotificationPeerErr + for _, profiler := range profiles { + hostErrs = append(hostErrs, globalNotificationSys.StartProfiling(profiler)...) + + // Start profiling locally as well. + prof, err := startProfiler(profiler) if err != nil { hostErrs = append(hostErrs, NotificationPeerErr{ Host: *thisAddr, Err: err, }) } else { - globalProfiler = prof + globalProfiler[profiler] = prof hostErrs = append(hostErrs, NotificationPeerErr{ Host: *thisAddr, }) @@ -620,7 +634,7 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R return } - writeSuccessResponseJSON(w, []byte(startProfilingResultInBytes)) + writeSuccessResponseJSON(w, startProfilingResultInBytes) } // dummyFileInfo represents a dummy representation of a profile data file diff --git a/cmd/notification.go b/cmd/notification.go index af70efb92..9c32714c5 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -319,36 +319,39 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io profilingDataFound = true - // Send profiling data to zip as file - header, zerr := zip.FileInfoHeader(dummyFileInfo{ - name: fmt.Sprintf("profiling-%s.pprof", client.host.String()), - size: int64(len(data)), - mode: 0600, - modTime: UTCNow(), - isDir: false, - sys: nil, - }) - if zerr != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) - ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogIf(ctx, zerr) - continue - } - zwriter, zerr := zipWriter.CreateHeader(header) - if zerr != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) - ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogIf(ctx, zerr) - continue - } - if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) - ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogIf(ctx, err) - continue + for typ, data := range data { + // Send profiling data to zip as file + header, zerr := zip.FileInfoHeader(dummyFileInfo{ + name: fmt.Sprintf("profiling-%s-%s.pprof", client.host.String(), typ), + size: int64(len(data)), + mode: 0600, + modTime: UTCNow(), + isDir: false, + sys: nil, + }) + if zerr != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, zerr) + continue + } + zwriter, zerr := zipWriter.CreateHeader(header) + if zerr != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, zerr) + continue + } + if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, err) + continue + } } } + // Local host thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) if err != nil { logger.LogIf(ctx, err) @@ -366,25 +369,27 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io profilingDataFound = true // Send profiling data to zip as file - header, zerr := zip.FileInfoHeader(dummyFileInfo{ - name: fmt.Sprintf("profiling-%s.pprof", thisAddr), - size: int64(len(data)), - mode: 0600, - modTime: UTCNow(), - isDir: false, - sys: nil, - }) - if zerr != nil { - return profilingDataFound - } + for typ, data := range data { + header, zerr := zip.FileInfoHeader(dummyFileInfo{ + name: fmt.Sprintf("profiling-%s-%s.pprof", thisAddr, typ), + size: int64(len(data)), + mode: 0600, + modTime: UTCNow(), + isDir: false, + sys: nil, + }) + if zerr != nil { + return profilingDataFound + } - zwriter, zerr := zipWriter.CreateHeader(header) - if zerr != nil { - return profilingDataFound - } + zwriter, zerr := zipWriter.CreateHeader(header) + if zerr != nil { + return profilingDataFound + } - if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { - return profilingDataFound + if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { + return profilingDataFound + } } return profilingDataFound diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index fa6b0176f..9f92931c4 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -225,7 +225,7 @@ func (client *peerRESTClient) StartProfiling(profiler string) error { } // DownloadProfileData - download profiled data from a remote node. -func (client *peerRESTClient) DownloadProfileData() (data []byte, err error) { +func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err error) { respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1) if err != nil { return diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 1a725cff9..77520617e 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -395,28 +395,41 @@ func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Re } vars := mux.Vars(r) - profiler := vars[peerRESTProfiler] - if profiler == "" { + profiles := strings.Split(vars[peerRESTProfiler], ",") + if len(profiles) == 0 { s.writeErrorResponse(w, errors.New("profiler name is missing")) return } - - if globalProfiler != nil { - globalProfiler.Stop() + globalProfilerMu.Lock() + defer globalProfilerMu.Unlock() + if globalProfiler == nil { + globalProfiler = make(map[string]minioProfiler, 10) } - var err error - globalProfiler, err = startProfiler(profiler, "") - if err != nil { - s.writeErrorResponse(w, err) - return + // Stop profiler of all types if already running + for k, v := range globalProfiler { + for _, p := range profiles { + if p == k { + v.Stop() + delete(globalProfiler, k) + } + } + } + + for _, profiler := range profiles { + prof, err := startProfiler(profiler) + if err != nil { + s.writeErrorResponse(w, err) + return + } + globalProfiler[profiler] = prof } w.(http.Flusher).Flush() } -// DownloadProflingDataHandler - returns proflied data. -func (s *peerRESTServer) DownloadProflingDataHandler(w http.ResponseWriter, r *http.Request) { +// DownloadProfilingDataHandler - returns profiled data. +func (s *peerRESTServer) DownloadProfilingDataHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { s.writeErrorResponse(w, errors.New("Invalid request")) return @@ -1156,7 +1169,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadGroup).HandlerFunc(httpTraceAll(server.LoadGroupHandler)).Queries(restQueries(peerRESTGroup)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProflingDataHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProfilingDataHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...) diff --git a/cmd/signals.go b/cmd/signals.go index 20c81d9cf..b4163bf62 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -28,8 +28,12 @@ func handleSignals() { // Custom exit function exit := func(success bool) { // If global profiler is set stop before we exit. - if globalProfiler != nil { - globalProfiler.Stop() + globalProfilerMu.Lock() + defer globalProfilerMu.Unlock() + if len(globalProfiler) > 0 { + for _, p := range globalProfiler { + p.Stop() + } } if success { diff --git a/cmd/utils.go b/cmd/utils.go index 6fae2e912..754fa0e36 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -32,7 +32,11 @@ import ( "os" "path/filepath" "reflect" + "runtime" + "runtime/pprof" + "runtime/trace" "strings" + "sync" "time" "github.com/beevik/ntp" @@ -42,7 +46,6 @@ import ( humanize "github.com/dustin/go-humanize" "github.com/gorilla/mux" - "github.com/pkg/profile" ) // IsErrIgnored returns whether given error is ignored or not. @@ -187,94 +190,130 @@ func contains(slice interface{}, elem interface{}) bool { // provide any API to calculate the profiler file path in the // disk since the name of this latter is randomly generated. type profilerWrapper struct { - stopFn func() - pathFn func() string + stopFn func() ([]byte, error) } -func (p profilerWrapper) Stop() { - p.stopFn() -} - -func (p profilerWrapper) Path() string { - return p.pathFn() +func (p profilerWrapper) Stop() ([]byte, error) { + return p.stopFn() } // Returns current profile data, returns error if there is no active // profiling in progress. Stops an active profile. -func getProfileData() ([]byte, error) { - if globalProfiler == nil { +func getProfileData() (map[string][]byte, error) { + globalProfilerMu.Lock() + defer globalProfilerMu.Unlock() + + if len(globalProfiler) == 0 { return nil, errors.New("profiler not enabled") } - profilerPath := globalProfiler.Path() - - // Stop the profiler - globalProfiler.Stop() - - profilerFile, err := os.Open(profilerPath) - if err != nil { - return nil, err + dst := make(map[string][]byte, len(globalProfiler)) + for typ, prof := range globalProfiler { + // Stop the profiler + var err error + buf, err := prof.Stop() + delete(globalProfiler, typ) + if err == nil { + dst[typ] = buf + } } - - return ioutil.ReadAll(profilerFile) + return dst, nil } // Starts a profiler returns nil if profiler is not enabled, caller needs to handle this. -func startProfiler(profilerType, dirPath string) (minioProfiler, error) { - var err error - if dirPath == "" { - dirPath, err = ioutil.TempDir("", "profile") - if err != nil { - return nil, err - } - } - - var profiler interface { - Stop() - } - - var profilerFileName string +func startProfiler(profilerType string) (minioProfiler, error) { + var prof profilerWrapper // Enable profiler and set the name of the file that pkg/pprof // library creates to store profiling data. switch profilerType { case "cpu": - profiler = profile.Start(profile.CPUProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) - profilerFileName = "cpu.pprof" + dirPath, err := ioutil.TempDir("", "profile") + if err != nil { + return nil, err + } + fn := filepath.Join(dirPath, "cpu.out") + f, err := os.Create(fn) + if err != nil { + return nil, err + } + err = pprof.StartCPUProfile(f) + if err != nil { + return nil, err + } + prof.stopFn = func() ([]byte, error) { + pprof.StopCPUProfile() + err := f.Close() + if err != nil { + return nil, err + } + defer os.RemoveAll(dirPath) + return ioutil.ReadFile(fn) + } case "mem": - profiler = profile.Start(profile.MemProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) - profilerFileName = "mem.pprof" + old := runtime.MemProfileRate + runtime.MemProfileRate = 1 + prof.stopFn = func() ([]byte, error) { + var buf bytes.Buffer + runtime.MemProfileRate = old + err := pprof.Lookup("heap").WriteTo(&buf, 0) + return buf.Bytes(), err + } case "block": - profiler = profile.Start(profile.BlockProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) - profilerFileName = "block.pprof" + runtime.SetBlockProfileRate(1) + prof.stopFn = func() ([]byte, error) { + var buf bytes.Buffer + runtime.SetBlockProfileRate(0) + err := pprof.Lookup("block").WriteTo(&buf, 0) + return buf.Bytes(), err + } case "mutex": - profiler = profile.Start(profile.MutexProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) - profilerFileName = "mutex.pprof" + runtime.SetMutexProfileFraction(1) + prof.stopFn = func() ([]byte, error) { + var buf bytes.Buffer + runtime.SetMutexProfileFraction(0) + err := pprof.Lookup("mutex").WriteTo(&buf, 0) + return buf.Bytes(), err + } case "trace": - profiler = profile.Start(profile.TraceProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) - profilerFileName = "trace.out" + dirPath, err := ioutil.TempDir("", "profile") + if err != nil { + return nil, err + } + fn := filepath.Join(dirPath, "trace.out") + f, err := os.Create(fn) + if err != nil { + return nil, err + } + err = trace.Start(f) + if err != nil { + return nil, err + } + prof.stopFn = func() ([]byte, error) { + trace.Stop() + err := f.Close() + if err != nil { + return nil, err + } + defer os.RemoveAll(dirPath) + return ioutil.ReadFile(fn) + } default: return nil, errors.New("profiler type unknown") } - return &profilerWrapper{ - stopFn: profiler.Stop, - pathFn: func() string { - return filepath.Join(dirPath, profilerFileName) - }, - }, nil + return prof, nil } // minioProfiler - minio profiler interface. type minioProfiler interface { // Stop the profiler - Stop() - // Return the path of the profiling file - Path() string + Stop() ([]byte, error) } // Global profiler to be used by service go-routine. -var globalProfiler minioProfiler +var globalProfiler map[string]minioProfiler +var globalProfilerMu sync.Mutex // dump the request into a string in JSON format. func dumpRequest(r *http.Request) string { diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 195e31bad..f3cbdb1d1 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -191,7 +191,7 @@ func TestURL2BucketObjectName(t *testing.T) { // Add tests for starting and stopping different profilers. func TestStartProfiler(t *testing.T) { - _, err := startProfiler("", "") + _, err := startProfiler("") if err == nil { t.Fatal("Expected a non nil error, but nil error returned for invalid profiler.") } diff --git a/go.mod b/go.mod index 8853a1011..d5e859c30 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,6 @@ require ( github.com/ncw/directio v1.0.5 github.com/nsqio/go-nsq v1.0.7 github.com/pkg/errors v0.8.1 - github.com/pkg/profile v1.3.0 github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect github.com/rcrowley/go-metrics v0.0.0-20190704165056-9c2d0518ed81 // indirect