Include speedtest as part of healthinfo api (#14696)

Execute the object, drive and net speedtests as part of the healthinfo
(if requested by the client), and include their result in the response.

The options for the speedtests have been picked from the default values
used by `mc support perf` command.
This commit is contained in:
Shireesh Anjal
2022-04-13 01:47:44 +05:30
committed by GitHub
parent 5f94cec1e2
commit 5c53620a72
4 changed files with 152 additions and 80 deletions

View File

@@ -1084,8 +1084,6 @@ func (a adminAPIHandlers) ObjectSpeedtestHandler(w http.ResponseWriter, r *http.
return
}
var bucketExists bool
sizeStr := r.Form.Get(peerRESTSize)
durationStr := r.Form.Get(peerRESTDuration)
concurrentStr := r.Form.Get(peerRESTConcurrent)
@@ -1111,49 +1109,29 @@ func (a adminAPIHandlers) ObjectSpeedtestHandler(w http.ResponseWriter, r *http.
duration = time.Second * 10
}
// ignores any errors here.
storageInfo, _ := objectAPI.StorageInfo(ctx)
capacityNeeded := uint64(concurrent * size)
capacity := uint64(GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo))
sufficientCapacity, canAutotune, capacityErrMsg := validateObjPerfOptions(ctx, objectAPI, concurrent, size, autotune)
if capacity < capacityNeeded {
if !sufficientCapacity {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, AdminError{
Code: "XMinioSpeedtestInsufficientCapacity",
Message: fmt.Sprintf("not enough usable space available to perform speedtest - expected %s, got %s",
humanize.IBytes(capacityNeeded), humanize.IBytes(capacity)),
Code: "XMinioSpeedtestInsufficientCapacity",
Message: capacityErrMsg,
StatusCode: http.StatusInsufficientStorage,
}), r.URL)
return
}
// Verify if we can employ autotune without running out of capacity,
// if we do run out of capacity, make sure to turn-off autotuning
// in such situations.
newConcurrent := concurrent + (concurrent+1)/2
autoTunedCapacityNeeded := uint64(newConcurrent * size)
if autotune && capacity < autoTunedCapacityNeeded {
// Turn-off auto-tuning if next possible concurrency would reach beyond disk capacity.
if autotune && !canAutotune {
autotune = false
}
err = objectAPI.MakeBucketWithLocation(ctx, globalObjectPerfBucket, BucketOptions{})
bucketExists, err := makeObjectPerfBucket(ctx, objectAPI)
if err != nil {
if _, ok := err.(BucketExists); !ok {
// Only BucketExists error can be ignored.
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
bucketExists = true
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
deleteBucket := func() {
objectAPI.DeleteBucket(context.Background(), globalObjectPerfBucket, DeleteBucketOptions{
Force: true,
NoRecreate: true,
})
}
if !bucketExists {
defer deleteBucket()
defer deleteObjectPerfBucket(objectAPI)
}
// Freeze all incoming S3 API calls before running speedtest.
@@ -1166,7 +1144,7 @@ func (a adminAPIHandlers) ObjectSpeedtestHandler(w http.ResponseWriter, r *http.
defer keepAliveTicker.Stop()
enc := json.NewEncoder(w)
ch := speedTest(ctx, speedTestOpts{size, concurrent, duration, autotune, storageClass})
ch := objectSpeedTest(ctx, speedTestOpts{size, concurrent, duration, autotune, storageClass})
for {
select {
case <-ctx.Done():
@@ -1189,6 +1167,50 @@ func (a adminAPIHandlers) ObjectSpeedtestHandler(w http.ResponseWriter, r *http.
}
}
func makeObjectPerfBucket(ctx context.Context, objectAPI ObjectLayer) (bucketExists bool, err error) {
err = objectAPI.MakeBucketWithLocation(ctx, globalObjectPerfBucket, BucketOptions{})
if err != nil {
if _, ok := err.(BucketExists); !ok {
// Only BucketExists error can be ignored.
return false, err
}
bucketExists = true
}
return bucketExists, nil
}
func deleteObjectPerfBucket(objectAPI ObjectLayer) {
objectAPI.DeleteBucket(context.Background(), globalObjectPerfBucket, DeleteBucketOptions{
Force: true,
NoRecreate: true,
})
}
func validateObjPerfOptions(ctx context.Context, objectAPI ObjectLayer, concurrent int, size int, autotune bool) (sufficientCapacity bool, canAutotune bool, capacityErrMsg string) {
storageInfo, _ := objectAPI.StorageInfo(ctx)
capacityNeeded := uint64(concurrent * size)
capacity := uint64(GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo))
if capacity < capacityNeeded {
return false, false, fmt.Sprintf("not enough usable space available to perform speedtest - expected %s, got %s",
humanize.IBytes(capacityNeeded), humanize.IBytes(capacity))
}
// Verify if we can employ autotune without running out of capacity,
// if we do run out of capacity, make sure to turn-off autotuning
// in such situations.
if autotune {
newConcurrent := concurrent + (concurrent+1)/2
autoTunedCapacityNeeded := uint64(newConcurrent * size)
if capacity < autoTunedCapacityNeeded {
// Turn-off auto-tuning if next possible concurrency would reach beyond disk capacity.
return true, false, ""
}
}
return true, autotune, ""
}
// NetSpeedtestHandler - reports maximum network throughput
func (a adminAPIHandlers) NetSpeedtestHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "NetSpeedtestHandler")
@@ -1787,7 +1809,9 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque
}
deadline := 10 * time.Second // Default deadline is 10secs for health diagnostics.
if query.Get("perfnet") != "" || query.Get("perfdrive") != "" {
if query.Get(string(madmin.HealthDataTypePerfNet)) != "" ||
query.Get(string(madmin.HealthDataTypePerfDrive)) != "" ||
query.Get(string(madmin.HealthDataTypePerfObj)) != "" {
deadline = 1 * time.Hour
}
if dstr := r.Form.Get("deadline"); dstr != "" {
@@ -2032,49 +2056,98 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque
}
getAndWriteDrivePerfInfo := func() {
if query.Get("perfdrive") == "true" {
localDPI := getDrivePerfInfos(deadlinedCtx, globalLocalNodeName)
anonymizeAddr(&localDPI)
healthInfo.Perf.Drives = append(healthInfo.Perf.Drives, localDPI)
partialWrite(healthInfo)
if query.Get(string(madmin.HealthDataTypePerfDrive)) == "true" {
// Freeze all incoming S3 API calls before running speedtest.
globalNotificationSys.ServiceFreeze(ctx, true)
// unfreeze all incoming S3 API calls after speedtest.
defer globalNotificationSys.ServiceFreeze(ctx, false)
perfCh := globalNotificationSys.GetDrivePerfInfos(deadlinedCtx)
for perfInfo := range perfCh {
anonymizeAddr(&perfInfo)
healthInfo.Perf.Drives = append(healthInfo.Perf.Drives, perfInfo)
partialWrite(healthInfo)
opts := madmin.DriveSpeedTestOpts{
Serial: false,
BlockSize: 4 * humanize.MiByte,
FileSize: 1 * humanize.GiByte,
}
localDPI := driveSpeedTest(ctx, opts)
healthInfo.Perf.DrivePerf = append(healthInfo.Perf.DrivePerf, localDPI)
perfCh := globalNotificationSys.DriveSpeedTest(ctx, opts)
for perfInfo := range perfCh {
healthInfo.Perf.DrivePerf = append(healthInfo.Perf.DrivePerf, perfInfo)
}
partialWrite(healthInfo)
}
}
anonymizeNetPerfInfo := func(npi *madmin.NetPerfInfo) {
anonymizeAddr(npi)
rps := npi.RemotePeers
for idx, peer := range rps {
anonymizeAddr(&peer)
rps[idx] = peer
getAndWriteObjPerfInfo := func() {
if query.Get(string(madmin.HealthDataTypePerfObj)) == "true" {
concurrent := 32
size := 64 * humanize.MiByte
autotune := true
sufficientCapacity, canAutotune, capacityErrMsg := validateObjPerfOptions(ctx, objectAPI, concurrent, size, autotune)
if !sufficientCapacity {
healthInfo.Perf.Error = capacityErrMsg
partialWrite(healthInfo)
return
}
if !canAutotune {
autotune = false
}
bucketExists, err := makeObjectPerfBucket(ctx, objectAPI)
if err != nil {
healthInfo.Perf.Error = "Could not make object perf bucket: " + err.Error()
partialWrite(healthInfo)
return
}
if !bucketExists {
defer deleteObjectPerfBucket(objectAPI)
}
// Freeze all incoming S3 API calls before running speedtest.
globalNotificationSys.ServiceFreeze(ctx, true)
// unfreeze all incoming S3 API calls after speedtest.
defer globalNotificationSys.ServiceFreeze(ctx, false)
opts := speedTestOpts{
throughputSize: size,
concurrencyStart: concurrent,
duration: 10 * time.Second,
autotune: autotune,
}
perfCh := objectSpeedTest(ctx, opts)
for perfInfo := range perfCh {
healthInfo.Perf.ObjPerf = append(healthInfo.Perf.ObjPerf, perfInfo)
}
partialWrite(healthInfo)
}
npi.RemotePeers = rps
}
getAndWriteNetPerfInfo := func() {
if globalIsDistErasure && query.Get("perfnet") == "true" {
localNPI := globalNotificationSys.GetNetPerfInfo(deadlinedCtx)
anonymizeNetPerfInfo(&localNPI)
healthInfo.Perf.Net = append(healthInfo.Perf.Net, localNPI)
partialWrite(healthInfo)
netInfos := globalNotificationSys.DispatchNetPerfChan(deadlinedCtx)
for netInfo := range netInfos {
anonymizeNetPerfInfo(&netInfo)
healthInfo.Perf.Net = append(healthInfo.Perf.Net, netInfo)
partialWrite(healthInfo)
if query.Get(string(madmin.HealthDataTypePerfObj)) == "true" {
if !globalIsDistErasure {
return
}
ppi := globalNotificationSys.GetParallelNetPerfInfo(deadlinedCtx)
anonymizeNetPerfInfo(&ppi)
healthInfo.Perf.NetParallel = ppi
nsLock := objectAPI.NewNSLock(minioMetaBucket, "netperf")
lkctx, err := nsLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
healthInfo.Perf.Error = "Could not acquire lock for netperf: " + err.Error()
} else {
defer nsLock.Unlock(lkctx.Cancel)
netPerf := globalNotificationSys.Netperf(ctx, time.Second*10)
for _, np := range netPerf {
np.Endpoint = anonAddr(np.Endpoint)
healthInfo.Perf.NetPerf = append(healthInfo.Perf.NetPerf, np)
}
}
partialWrite(healthInfo)
}
}
@@ -2108,6 +2181,7 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque
getAndWriteProcInfo()
getAndWriteMinioConfig()
getAndWriteDrivePerfInfo()
getAndWriteObjPerfInfo()
getAndWriteNetPerfInfo()
getAndWriteSysErrors()
getAndWriteSysServices()
@@ -2640,6 +2714,7 @@ func anonymizeHost(hostAnonymizer map[string]string, endpoint Endpoint, poolNum
if !found {
// In distributed setup, anonymized addr = 'poolNum.serverNum'
newHost := fmt.Sprintf("pool%d.server%d", poolNum, srvrNum)
schemePfx := endpoint.Scheme + "://"
// Hostname
mapIfNotPresent(hostAnonymizer, endpoint.Hostname(), newHost)
@@ -2649,6 +2724,7 @@ func anonymizeHost(hostAnonymizer map[string]string, endpoint Endpoint, poolNum
// Host + port
newHostPort = newHost + ":" + endpoint.Port()
mapIfNotPresent(hostAnonymizer, endpoint.Host, newHostPort)
mapIfNotPresent(hostAnonymizer, schemePfx+endpoint.Host, newHostPort)
}
newHostPortPath := newHostPort
@@ -2657,10 +2733,11 @@ func anonymizeHost(hostAnonymizer map[string]string, endpoint Endpoint, poolNum
currentHostPortPath := endpoint.Host + endpoint.Path
newHostPortPath = newHostPort + endpoint.Path
mapIfNotPresent(hostAnonymizer, currentHostPortPath, newHostPortPath)
mapIfNotPresent(hostAnonymizer, schemePfx+currentHostPortPath, newHostPortPath)
}
// Full url
hostAnonymizer[currentURL] = endpoint.Scheme + "://" + newHostPortPath
hostAnonymizer[currentURL] = schemePfx + newHostPortPath
}
}