diff --git a/cmd/perf-tests.go b/cmd/perf-tests.go index b28e6e7ee..331c0784f 100644 --- a/cmd/perf-tests.go +++ b/cmd/perf-tests.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "encoding/gob" "errors" "fmt" "io" @@ -367,29 +368,14 @@ func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerf for i := 0; i < connectionsPerPeer; i++ { go func() { defer wg.Done() - cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) - if err != nil { - return - } - rp := cli.GetEndpointURL() - reqURL := &url.URL{ - Scheme: rp.Scheme, - Host: rp.Host, - Path: adminPathPrefix + adminAPIVersionPrefix + adminAPISiteReplicationDevNull, - } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), r) - if err != nil { - return - } - client := &http.Client{ - Timeout: duration + 10*time.Second, - Transport: globalRemoteTargetTransport, - } - resp, err := client.Do(req) - if err != nil { - return - } - defer xhttp.DrainBody(resp.Body) + ctx, cancel := context.WithTimeout(ctx, duration+10*time.Second) + defer cancel() + perfNetRequest( + ctx, + info.DeploymentID, + adminPathPrefix+adminAPIVersionPrefix+adminAPISiteReplicationDevNull, + r, + ) }() } } @@ -422,3 +408,41 @@ func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerf TotalConn: uint64(connectionsPerPeer), } } + +// perfNetRequest - reader for http.request.body +func perfNetRequest(ctx context.Context, deploymentID, reqPath string, reader io.Reader) (result madmin.SiteNetPerfNodeResult) { + result = madmin.SiteNetPerfNodeResult{} + cli, err := globalSiteReplicationSys.getAdminClient(ctx, deploymentID) + if err != nil { + result.Error = err.Error() + return + } + rp := cli.GetEndpointURL() + reqURL := &url.URL{ + Scheme: rp.Scheme, + Host: rp.Host, + Path: reqPath, + } + result.Endpoint = rp.String() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), reader) + if err != nil { + result.Error = err.Error() + return + } + client := &http.Client{ + Transport: globalRemoteTargetTransport, + } + resp, err := client.Do(req) + if err != nil { + result.Error = err.Error() + return + } + defer xhttp.DrainBody(resp.Body) + err = gob.NewDecoder(resp.Body).Decode(&result) + // endpoint have been overwritten + result.Endpoint = rp.String() + if err != nil { + result.Error = err.Error() + } + return +} diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 781fe9673..d102fb58e 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -22,12 +22,10 @@ import ( "context" "encoding/base64" "encoding/binary" - "encoding/gob" "encoding/json" "encoding/xml" "errors" "fmt" - "net/http" "net/url" "reflect" "runtime" @@ -43,7 +41,6 @@ import ( "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/auth" sreplication "github.com/minio/minio/internal/bucket/replication" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" bktpolicy "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" @@ -663,64 +660,38 @@ func (c *SiteReplicationSys) Netperf(ctx context.Context, duration time.Duration // will call siteNetperf, means call others's adminAPISiteReplicationDevNull if globalDeploymentID == info.DeploymentID { wg.Add(1) - go func() (err error) { + go func() { defer wg.Done() - result := &madmin.SiteNetPerfNodeResult{} - defer func() { - if err != nil { - result.Error = err.Error() - } - resultsMu.Lock() - results.NodeResults = append(results.NodeResults, *result) - resultsMu.Unlock() - }() + result := madmin.SiteNetPerfNodeResult{} cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) - if err != nil { - return err - } - *result = siteNetperf(ctx, duration) - result.Endpoint = cli.GetEndpointURL().String() - return nil - }() - continue - } - wg.Add(1) - go func() (err error) { - defer wg.Done() - result := madmin.SiteNetPerfNodeResult{} - defer func() { if err != nil { result.Error = err.Error() + } else { + result = siteNetperf(ctx, duration) + result.Endpoint = cli.GetEndpointURL().String() } resultsMu.Lock() results.NodeResults = append(results.NodeResults, result) resultsMu.Unlock() + return }() - cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) - if err != nil { - return err - } - rp := cli.GetEndpointURL() - reqURL := &url.URL{ - Scheme: rp.Scheme, - Host: rp.Host, - Path: adminPathPrefix + adminAPIVersionPrefix + adminAPISiteReplicationNetPerf, - } - result.Endpoint = rp.String() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), nil) - if err != nil { - return err - } - client := &http.Client{ - Timeout: duration + 10*time.Second, - Transport: globalRemoteTargetTransport, - } - resp, err := client.Do(req) - if err != nil { - return err - } - defer xhttp.DrainBody(resp.Body) - return gob.NewDecoder(resp.Body).Decode(&result) + continue + } + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(ctx, duration+10*time.Second) + defer cancel() + result := perfNetRequest( + ctx, + info.DeploymentID, + adminPathPrefix+adminAPIVersionPrefix+adminAPISiteReplicationNetPerf, + nil, + ) + resultsMu.Lock() + results.NodeResults = append(results.NodeResults, result) + resultsMu.Unlock() + return }() } wg.Wait()