From 4d0715d226cc24ca5122cfd1b7283e6b84916723 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Tue, 8 Mar 2022 09:54:38 -0800 Subject: [PATCH] Implement netperf for "mc support perf net" (#14397) Co-authored-by: Klaus Post --- cmd/admin-handlers.go | 44 ++++++ cmd/admin-router.go | 2 +- cmd/globals.go | 5 + cmd/notification.go | 52 +++++++ cmd/peer-rest-client.go | 24 ++++ cmd/peer-rest-common.go | 4 +- cmd/peer-rest-server.go | 193 +++++++------------------- cmd/perf-tests.go | 295 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 470 insertions(+), 149 deletions(-) create mode 100644 cmd/perf-tests.go diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 7f81bb60c..5fa5d71c7 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -936,6 +936,50 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * } } +// NetperfHandler - perform mesh style network throughput test +func (a adminAPIHandlers) NetperfHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "NetperfHandler") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealthInfoAdminAction) + if objectAPI == nil { + return + } + + if !globalIsDistErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + nsLock := objectAPI.NewNSLock(minioMetaBucket, "netperf") + lkctx, err := nsLock.GetLock(ctx, globalOperationTimeout) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(toAPIErrorCode(ctx, err)), r.URL) + return + } + defer nsLock.Unlock(lkctx.Cancel) + + durationStr := r.Form.Get(peerRESTDuration) + duration, err := time.ParseDuration(durationStr) + if err != nil { + duration = globalNetPerfMinDuration + } + + if duration < globalNetPerfMinDuration { + // We need sample size of minimum 10 secs. + duration = globalNetPerfMinDuration + } + + duration = duration.Round(time.Second) + + results := globalNotificationSys.Netperf(ctx, duration) + enc := json.NewEncoder(w) + if err := enc.Encode(madmin.NetperfResult{NodeResults: results}); err != nil { + return + } +} + // SpeedtestHandler - Deprecated. See ObjectSpeedtestHandler func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Request) { a.ObjectSpeedtestHandler(w, r) diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 196a5e293..f4b49a70f 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -224,7 +224,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest").HandlerFunc(httpTraceHdrs(adminAPI.SpeedtestHandler)) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/object").HandlerFunc(httpTraceHdrs(adminAPI.ObjectSpeedtestHandler)) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/drive").HandlerFunc(httpTraceHdrs(adminAPI.DriveSpeedtestHandler)) - adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/net").HandlerFunc(httpTraceHdrs(adminAPI.NetSpeedtestHandler)) + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/net").HandlerFunc(httpTraceHdrs(adminAPI.NetperfHandler)) // HTTP Trace adminRouter.Methods(http.MethodGet).Path(adminVersion + "/trace").HandlerFunc(gz(http.HandlerFunc(adminAPI.TraceHandler))) diff --git a/cmd/globals.go b/cmd/globals.go index b5bc8a0cb..0e3068969 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -345,6 +345,11 @@ var ( globalIsCICD bool globalRootDiskThreshold uint64 + + // Used for collecting stats for netperf + globalNetPerfMinDuration = time.Second * 10 + globalNetPerfRX netPerfRX + // Add new variable global values here. ) diff --git a/cmd/notification.go b/cmd/notification.go index e18f78e1b..b16fe527e 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1553,6 +1553,58 @@ func (sys *NotificationSys) ServiceFreeze(ctx context.Context, freeze bool) []No return nerrs } +// Netperf - perform mesh style network throughput test +func (sys *NotificationSys) Netperf(ctx context.Context, duration time.Duration) []madmin.NetperfNodeResult { + length := len(sys.allPeerClients) + if length == 0 { + // For single node erasure setup. + return nil + } + results := make([]madmin.NetperfNodeResult, length) + + scheme := "http" + if globalIsTLS { + scheme = "https" + } + + var wg sync.WaitGroup + for index := range sys.peerClients { + if sys.peerClients[index] == nil { + continue + } + wg.Add(1) + go func(index int) { + defer wg.Done() + r, err := sys.peerClients[index].Netperf(ctx, duration) + u := &url.URL{ + Scheme: scheme, + Host: sys.peerClients[index].host.String(), + } + if err != nil { + results[index].Error = err.Error() + } else { + results[index] = r + } + results[index].Endpoint = u.String() + }(index) + } + + wg.Add(1) + go func() { + defer wg.Done() + r := netperf(ctx, duration) + u := &url.URL{ + Scheme: scheme, + Host: globalLocalNodeName, + } + results[len(results)-1] = r + results[len(results)-1].Endpoint = u.String() + }() + wg.Wait() + + return results +} + // Speedtest run GET/PUT tests at input concurrency for requested object size, // optionally you can extend the tests longer with time.Duration. func (sys *NotificationSys) Speedtest(ctx context.Context, size int, diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index fa963ad54..a5592f7a6 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -1119,3 +1119,27 @@ func (client *peerRESTClient) GetLastDayTierStats(ctx context.Context) (dailyAll } return dailyAllTierStats(result), nil } + +// DevNull - Used by netperf to pump data to peer +func (client *peerRESTClient) DevNull(ctx context.Context, r io.Reader) error { + respBody, err := client.callWithContext(ctx, peerRESTMethodDevNull, nil, r, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return err +} + +// Netperf - To initiate netperf on peer +func (client *peerRESTClient) Netperf(ctx context.Context, duration time.Duration) (madmin.NetperfNodeResult, error) { + var result madmin.NetperfNodeResult + values := make(url.Values) + values.Set(peerRESTDuration, duration.String()) + respBody, err := client.callWithContext(context.Background(), peerRESTMethodNetperf, values, nil, -1) + if err != nil { + return result, err + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&result) + return result, err +} diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 3f65af4e1..5bfbd61af 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - peerRESTVersion = "v20" // Add drivespeedtest + peerRESTVersion = "v21" // Add netperf peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix @@ -70,6 +70,8 @@ const ( peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig" peerRESTMethodReloadPoolMeta = "/reloadpoolmeta" peerRESTMethodGetLastDayTierStats = "/getlastdaytierstats" + peerRESTMethodDevNull = "/devnull" + peerRESTMethodNetperf = "/netperf" ) const ( diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 826c35f34..471c0de04 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -27,20 +27,15 @@ import ( "net/http" "strconv" "strings" - "sync" "sync/atomic" "time" "github.com/dustin/go-humanize" - "github.com/google/uuid" "github.com/gorilla/mux" "github.com/minio/madmin-go" b "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/event" - "github.com/minio/minio/internal/hash" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" - "github.com/minio/pkg/randreader" "github.com/tinylib/msgp/msgp" ) @@ -1143,148 +1138,6 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request) } } -// SpeedtestResult return value of the speedtest function -type SpeedtestResult struct { - Endpoint string - Uploads uint64 - Downloads uint64 - Error string -} - -func newRandomReader(size int) io.Reader { - return io.LimitReader(randreader.New(), int64(size)) -} - -// Runs the speedtest on local MinIO process. -func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration, storageClass string) (SpeedtestResult, error) { - objAPI := newObjectLayerFn() - if objAPI == nil { - return SpeedtestResult{}, errServerNotInitialized - } - - var errOnce sync.Once - var retError string - var wg sync.WaitGroup - var totalBytesWritten uint64 - var totalBytesRead uint64 - - objCountPerThread := make([]uint64, concurrent) - - uploadsCtx, uploadsCancel := context.WithCancel(context.Background()) - defer uploadsCancel() - - go func() { - time.Sleep(duration) - uploadsCancel() - }() - - objNamePrefix := "speedtest/objects/" + uuid.New().String() - - wg.Add(concurrent) - for i := 0; i < concurrent; i++ { - go func(i int) { - defer wg.Done() - for { - hashReader, err := hash.NewReader(newRandomReader(size), - int64(size), "", "", int64(size)) - if err != nil { - if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) { - errOnce.Do(func() { - retError = err.Error() - }) - } - uploadsCancel() - return - } - reader := NewPutObjReader(hashReader) - objInfo, err := objAPI.PutObject(uploadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d", - objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{ - UserDefined: map[string]string{ - xhttp.AmzStorageClass: storageClass, - }, - Speedtest: true, - }) - if err != nil { - objCountPerThread[i]-- - if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) { - errOnce.Do(func() { - retError = err.Error() - }) - } - uploadsCancel() - return - } - atomic.AddUint64(&totalBytesWritten, uint64(objInfo.Size)) - objCountPerThread[i]++ - } - }(i) - } - wg.Wait() - - // We already saw write failures, no need to proceed into read's - if retError != "" { - return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil - } - - downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) - defer downloadsCancel() - go func() { - time.Sleep(duration) - downloadsCancel() - }() - - wg.Add(concurrent) - for i := 0; i < concurrent; i++ { - go func(i int) { - defer wg.Done() - var j uint64 - if objCountPerThread[i] == 0 { - return - } - for { - if objCountPerThread[i] == j { - j = 0 - } - r, err := objAPI.GetObjectNInfo(downloadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d", - objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) - if err != nil { - if isErrObjectNotFound(err) { - continue - } - if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) { - errOnce.Do(func() { - retError = err.Error() - }) - } - downloadsCancel() - return - } - n, err := io.Copy(ioutil.Discard, r) - r.Close() - if err == nil { - // Only capture success criteria - do not - // have to capture failed reads, truncated - // reads etc. - atomic.AddUint64(&totalBytesRead, uint64(n)) - } - if err != nil { - if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) { - errOnce.Do(func() { - retError = err.Error() - }) - } - downloadsCancel() - return - } - j++ - } - }(i) - } - wg.Wait() - - return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil -} - func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { s.writeErrorResponse(w, errors.New("invalid request")) @@ -1384,6 +1237,50 @@ func (s *peerRESTServer) DriveSpeedTestHandler(w http.ResponseWriter, r *http.Re logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result)) } +// DevNull - everything goes to io.Discard +func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("invalid request")) + return + } + + globalNetPerfRX.Connect() + defer globalNetPerfRX.Disconnect() + + connectTime := time.Now() + ctx := newContext(r, w, "DevNull") + for { + n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte) + atomic.AddUint64(&globalNetPerfRX.RX, uint64(n)) + if err != nil && err != io.EOF { + // If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec) + // would mean the network is not stable. Logging here will help in debugging network issues. + if time.Since(connectTime) < (globalNetPerfMinDuration - time.Second) { + logger.LogIf(ctx, err) + } + } + if err != nil { + break + } + } +} + +// Netperf - perform netperf +func (s *peerRESTServer) Netperf(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("invalid request")) + return + } + + durationStr := r.Form.Get(peerRESTDuration) + duration, err := time.ParseDuration(durationStr) + if err != nil || duration.Seconds() == 0 { + duration = time.Second * 10 + } + result := netperf(r.Context(), duration.Round(time.Second)) + logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result)) +} + // registerPeerRESTHandlers - register peer rest router. func registerPeerRESTHandlers(router *mux.Router) { server := &peerRESTServer{} @@ -1431,6 +1328,8 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveSpeedTest).HandlerFunc(httpTraceHdrs(server.DriveSpeedTestHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetperf).HandlerFunc(httpTraceHdrs(server.Netperf)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(httpTraceHdrs(server.DevNull)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(httpTraceHdrs(server.GetLastDayTierStatsHandler)) diff --git a/cmd/perf-tests.go b/cmd/perf-tests.go new file mode 100644 index 000000000..3ac43d859 --- /dev/null +++ b/cmd/perf-tests.go @@ -0,0 +1,295 @@ +// Copyright (c) 2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/dustin/go-humanize" + "github.com/google/uuid" + "github.com/minio/madmin-go" + "github.com/minio/minio/internal/hash" + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/pkg/randreader" +) + +// SpeedtestResult return value of the speedtest function +type SpeedtestResult struct { + Endpoint string + Uploads uint64 + Downloads uint64 + Error string +} + +func newRandomReader(size int) io.Reader { + return io.LimitReader(randreader.New(), int64(size)) +} + +// Runs the speedtest on local MinIO process. +func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration, storageClass string) (SpeedtestResult, error) { + objAPI := newObjectLayerFn() + if objAPI == nil { + return SpeedtestResult{}, errServerNotInitialized + } + + var errOnce sync.Once + var retError string + var wg sync.WaitGroup + var totalBytesWritten uint64 + var totalBytesRead uint64 + + objCountPerThread := make([]uint64, concurrent) + + uploadsCtx, uploadsCancel := context.WithCancel(context.Background()) + defer uploadsCancel() + + go func() { + time.Sleep(duration) + uploadsCancel() + }() + + objNamePrefix := "speedtest/objects/" + uuid.New().String() + + wg.Add(concurrent) + for i := 0; i < concurrent; i++ { + go func(i int) { + defer wg.Done() + for { + hashReader, err := hash.NewReader(newRandomReader(size), + int64(size), "", "", int64(size)) + if err != nil { + if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) { + errOnce.Do(func() { + retError = err.Error() + }) + } + uploadsCancel() + return + } + reader := NewPutObjReader(hashReader) + objInfo, err := objAPI.PutObject(uploadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d", + objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{ + UserDefined: map[string]string{ + xhttp.AmzStorageClass: storageClass, + }, + Speedtest: true, + }) + if err != nil { + objCountPerThread[i]-- + if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) { + errOnce.Do(func() { + retError = err.Error() + }) + } + uploadsCancel() + return + } + atomic.AddUint64(&totalBytesWritten, uint64(objInfo.Size)) + objCountPerThread[i]++ + } + }(i) + } + wg.Wait() + + // We already saw write failures, no need to proceed into read's + if retError != "" { + return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil + } + + downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) + defer downloadsCancel() + go func() { + time.Sleep(duration) + downloadsCancel() + }() + + wg.Add(concurrent) + for i := 0; i < concurrent; i++ { + go func(i int) { + defer wg.Done() + var j uint64 + if objCountPerThread[i] == 0 { + return + } + for { + if objCountPerThread[i] == j { + j = 0 + } + r, err := objAPI.GetObjectNInfo(downloadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d", + objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) + if err != nil { + if isErrObjectNotFound(err) { + continue + } + if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) { + errOnce.Do(func() { + retError = err.Error() + }) + } + downloadsCancel() + return + } + n, err := io.Copy(ioutil.Discard, r) + r.Close() + if err == nil { + // Only capture success criteria - do not + // have to capture failed reads, truncated + // reads etc. + atomic.AddUint64(&totalBytesRead, uint64(n)) + } + if err != nil { + if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) { + errOnce.Do(func() { + retError = err.Error() + }) + } + downloadsCancel() + return + } + j++ + } + }(i) + } + wg.Wait() + + return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil +} + +// To collect RX stats during "mc support perf net" +// RXSample holds the RX bytes for the duration between +// the last peer to connect and the first peer to disconnect. +// This is to improve the RX throughput accuracy. +type netPerfRX struct { + RX uint64 // RX bytes + lastToConnect time.Time // time at which last peer to connect to us + firstToDisconnect time.Time // time at which the first peer disconnects from us + RXSample uint64 // RX bytes between lastToConnect and firstToDisconnect + activeConnections uint64 + sync.RWMutex +} + +func (n *netPerfRX) Connect() { + n.Lock() + defer n.Unlock() + n.activeConnections++ + atomic.StoreUint64(&globalNetPerfRX.RX, 0) + n.lastToConnect = time.Now() +} + +func (n *netPerfRX) Disconnect() { + n.Lock() + defer n.Unlock() + n.activeConnections-- + if n.firstToDisconnect.IsZero() { + n.RXSample = atomic.LoadUint64(&n.RX) + n.firstToDisconnect = time.Now() + } +} + +func (n *netPerfRX) ActiveConnections() uint64 { + n.RLock() + defer n.RUnlock() + return n.activeConnections +} + +func (n *netPerfRX) Reset() { + n.RLock() + defer n.RUnlock() + n.RX = 0 + n.RXSample = 0 + n.lastToConnect = time.Time{} + n.firstToDisconnect = time.Time{} +} + +// Reader to read random data. +type netperfReader struct { + n uint64 + eof chan struct{} + buf []byte +} + +func (m *netperfReader) Read(b []byte) (int, error) { + select { + case <-m.eof: + return 0, io.EOF + default: + } + n := copy(b, m.buf) + atomic.AddUint64(&m.n, uint64(n)) + return n, nil +} + +func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResult { + r := &netperfReader{eof: make(chan struct{})} + r.buf = make([]byte, 128*humanize.KiByte) + rand.Read(r.buf) + + connectionsPerPeer := 16 + + if len(globalNotificationSys.peerClients) > 16 { + // For a large cluster it's enough to have 1 connection per peer to saturate the network. + connectionsPerPeer = 1 + } + + errStr := "" + var wg sync.WaitGroup + for index := range globalNotificationSys.peerClients { + if globalNotificationSys.peerClients[index] == nil { + continue + } + go func(index int) { + for i := 0; i < connectionsPerPeer; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := globalNotificationSys.peerClients[index].DevNull(ctx, r) + if err != nil { + errStr = err.Error() + } + }() + } + }(index) + } + + time.Sleep(duration) + close(r.eof) + wg.Wait() + for { + if globalNetPerfRX.ActiveConnections() == 0 { + break + } + time.Sleep(time.Second) + } + rx := float64(globalNetPerfRX.RXSample) + delta := globalNetPerfRX.firstToDisconnect.Sub(globalNetPerfRX.lastToConnect) + if delta < 0 { + rx = 0 + errStr = "network disconnection issues detected" + } + + globalNetPerfRX.Reset() + return madmin.NetperfNodeResult{Endpoint: "", TX: r.n / uint64(duration.Seconds()), RX: uint64(rx / delta.Seconds()), Error: errStr} +}