feat: support perf site replication (#17477)

This commit is contained in:
jiuker
2023-07-06 13:28:26 +08:00
committed by GitHub
parent 6efcf9c982
commit 2dbb1cff4a
8 changed files with 266 additions and 9 deletions

View File

@@ -22,10 +22,12 @@ import (
"context"
"encoding/base64"
"encoding/binary"
"encoding/gob"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"net/http"
"net/url"
"reflect"
"runtime"
@@ -41,6 +43,7 @@ import (
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/auth"
sreplication "github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
bktpolicy "github.com/minio/pkg/bucket/policy"
iampolicy "github.com/minio/pkg/iam/policy"
@@ -647,6 +650,83 @@ func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []Pe
return true, nil
}
// Netperf for site-replication net perf
func (c *SiteReplicationSys) Netperf(ctx context.Context, duration time.Duration) (results madmin.SiteNetPerfResult, err error) {
infos, err := globalSiteReplicationSys.GetClusterInfo(ctx)
if err != nil {
return results, err
}
var wg sync.WaitGroup
var resultsMu sync.RWMutex
for _, info := range infos.Sites {
info := info
// will call siteNetperf, means call others's adminAPISiteReplicationDevNull
if globalDeploymentID == info.DeploymentID {
wg.Add(1)
go func() (err error) {
defer wg.Done()
result := &madmin.SiteNetPerfNodeResult{}
defer func() {
if err != nil {
result.Error = err.Error()
}
resultsMu.Lock()
results.NodeResults = append(results.NodeResults, *result)
resultsMu.Unlock()
}()
cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID)
if err != nil {
return err
}
*result = siteNetperf(ctx, duration)
result.Endpoint = cli.GetEndpointURL().String()
return nil
}()
continue
}
wg.Add(1)
go func() (err error) {
defer wg.Done()
result := madmin.SiteNetPerfNodeResult{}
defer func() {
if err != nil {
result.Error = err.Error()
}
resultsMu.Lock()
results.NodeResults = append(results.NodeResults, result)
resultsMu.Unlock()
}()
cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID)
if err != nil {
return err
}
rp := cli.GetEndpointURL()
reqURL := &url.URL{
Scheme: rp.Scheme,
Host: rp.Host,
Path: adminPathPrefix + adminAPIVersionPrefix + adminAPISiteReplicationNetPerf,
}
result.Endpoint = rp.String()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), nil)
if err != nil {
return err
}
client := &http.Client{
Timeout: duration + 10*time.Second,
Transport: globalRemoteTargetTransport,
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer xhttp.DrainBody(resp.Body)
return gob.NewDecoder(resp.Body).Decode(&result)
}()
}
wg.Wait()
return
}
// GetClusterInfo - returns site replication information.
func (c *SiteReplicationSys) GetClusterInfo(ctx context.Context) (info madmin.SiteReplicationInfo, err error) {
c.RLock()