refactor the perf client for TTFB and TotalResponseTime (#17901)

This commit is contained in:
jiuker 2023-08-25 01:20:37 +08:00 committed by Harshavardhana
parent ba4566e86d
commit 02cc18ff29
2 changed files with 70 additions and 75 deletions

View File

@ -19,6 +19,7 @@ package cmd
import ( import (
"context" "context"
"encoding/gob"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -367,29 +368,14 @@ func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerf
for i := 0; i < connectionsPerPeer; i++ { for i := 0; i < connectionsPerPeer; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) ctx, cancel := context.WithTimeout(ctx, duration+10*time.Second)
if err != nil { defer cancel()
return perfNetRequest(
} ctx,
rp := cli.GetEndpointURL() info.DeploymentID,
reqURL := &url.URL{ adminPathPrefix+adminAPIVersionPrefix+adminAPISiteReplicationDevNull,
Scheme: rp.Scheme, r,
Host: rp.Host, )
Path: adminPathPrefix + adminAPIVersionPrefix + adminAPISiteReplicationDevNull,
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), r)
if err != nil {
return
}
client := &http.Client{
Timeout: duration + 10*time.Second,
Transport: globalRemoteTargetTransport,
}
resp, err := client.Do(req)
if err != nil {
return
}
defer xhttp.DrainBody(resp.Body)
}() }()
} }
} }
@ -422,3 +408,41 @@ func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerf
TotalConn: uint64(connectionsPerPeer), TotalConn: uint64(connectionsPerPeer),
} }
} }
// perfNetRequest - reader for http.request.body
func perfNetRequest(ctx context.Context, deploymentID, reqPath string, reader io.Reader) (result madmin.SiteNetPerfNodeResult) {
result = madmin.SiteNetPerfNodeResult{}
cli, err := globalSiteReplicationSys.getAdminClient(ctx, deploymentID)
if err != nil {
result.Error = err.Error()
return
}
rp := cli.GetEndpointURL()
reqURL := &url.URL{
Scheme: rp.Scheme,
Host: rp.Host,
Path: reqPath,
}
result.Endpoint = rp.String()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), reader)
if err != nil {
result.Error = err.Error()
return
}
client := &http.Client{
Transport: globalRemoteTargetTransport,
}
resp, err := client.Do(req)
if err != nil {
result.Error = err.Error()
return
}
defer xhttp.DrainBody(resp.Body)
err = gob.NewDecoder(resp.Body).Decode(&result)
// endpoint have been overwritten
result.Endpoint = rp.String()
if err != nil {
result.Error = err.Error()
}
return
}

View File

@ -22,12 +22,10 @@ import (
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/binary" "encoding/binary"
"encoding/gob"
"encoding/json" "encoding/json"
"encoding/xml" "encoding/xml"
"errors" "errors"
"fmt" "fmt"
"net/http"
"net/url" "net/url"
"reflect" "reflect"
"runtime" "runtime"
@ -43,7 +41,6 @@ import (
"github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/auth"
sreplication "github.com/minio/minio/internal/bucket/replication" sreplication "github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
bktpolicy "github.com/minio/pkg/bucket/policy" bktpolicy "github.com/minio/pkg/bucket/policy"
iampolicy "github.com/minio/pkg/iam/policy" iampolicy "github.com/minio/pkg/iam/policy"
@ -663,64 +660,38 @@ func (c *SiteReplicationSys) Netperf(ctx context.Context, duration time.Duration
// will call siteNetperf, means call others's adminAPISiteReplicationDevNull // will call siteNetperf, means call others's adminAPISiteReplicationDevNull
if globalDeploymentID == info.DeploymentID { if globalDeploymentID == info.DeploymentID {
wg.Add(1) wg.Add(1)
go func() (err error) { go func() {
defer wg.Done()
result := &madmin.SiteNetPerfNodeResult{}
defer func() {
if err != nil {
result.Error = err.Error()
}
resultsMu.Lock()
results.NodeResults = append(results.NodeResults, *result)
resultsMu.Unlock()
}()
cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID)
if err != nil {
return err
}
*result = siteNetperf(ctx, duration)
result.Endpoint = cli.GetEndpointURL().String()
return nil
}()
continue
}
wg.Add(1)
go func() (err error) {
defer wg.Done() defer wg.Done()
result := madmin.SiteNetPerfNodeResult{} result := madmin.SiteNetPerfNodeResult{}
defer func() { cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID)
if err != nil { if err != nil {
result.Error = err.Error() result.Error = err.Error()
} else {
result = siteNetperf(ctx, duration)
result.Endpoint = cli.GetEndpointURL().String()
} }
resultsMu.Lock() resultsMu.Lock()
results.NodeResults = append(results.NodeResults, result) results.NodeResults = append(results.NodeResults, result)
resultsMu.Unlock() resultsMu.Unlock()
return
}() }()
cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) continue
if err != nil {
return err
} }
rp := cli.GetEndpointURL() wg.Add(1)
reqURL := &url.URL{ go func() {
Scheme: rp.Scheme, defer wg.Done()
Host: rp.Host, ctx, cancel := context.WithTimeout(ctx, duration+10*time.Second)
Path: adminPathPrefix + adminAPIVersionPrefix + adminAPISiteReplicationNetPerf, defer cancel()
} result := perfNetRequest(
result.Endpoint = rp.String() ctx,
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), nil) info.DeploymentID,
if err != nil { adminPathPrefix+adminAPIVersionPrefix+adminAPISiteReplicationNetPerf,
return err nil,
} )
client := &http.Client{ resultsMu.Lock()
Timeout: duration + 10*time.Second, results.NodeResults = append(results.NodeResults, result)
Transport: globalRemoteTargetTransport, resultsMu.Unlock()
} return
resp, err := client.Do(req)
if err != nil {
return err
}
defer xhttp.DrainBody(resp.Body)
return gob.NewDecoder(resp.Body).Decode(&result)
}() }()
} }
wg.Wait() wg.Wait()