Support live updates for clients during speedtest (#13566)

This commit is contained in:
Krishna Srinivas
2021-11-02 15:27:03 -07:00
committed by GitHub
parent ad3f98b8e7
commit 58934e5881
4 changed files with 126 additions and 118 deletions

View File

@@ -951,47 +951,31 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
duration = time.Second * 10
}
throughputSize := size
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
endBlankRepliesCh := make(chan error)
go func() {
for {
select {
case <-ctx.Done():
endBlankRepliesCh <- nil
ch := speedTest(ctx, size, concurrent, duration, autotune)
for {
select {
case <-ctx.Done():
return
case <-keepAliveTicker.C:
// Write a blank entry to prevent client from disconnecting
if err := json.NewEncoder(w).Encode(madmin.SpeedTestResult{}); err != nil {
return
case <-keepAliveTicker.C:
// Write a blank entry to prevent client from disconnecting
if err := json.NewEncoder(w).Encode(madmin.SpeedTestResult{}); err != nil {
endBlankRepliesCh <- err
return
}
w.(http.Flusher).Flush()
case endBlankRepliesCh <- nil:
}
w.(http.Flusher).Flush()
case result, ok := <-ch:
if !ok {
defer objectAPI.DeleteBucket(context.Background(), pathJoin(minioMetaSpeedTestBucket, minioMetaSpeedTestBucketPrefix), DeleteBucketOptions{Force: true, NoRecreate: true})
return
}
if err := json.NewEncoder(w).Encode(result); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}
}()
result, err := speedTest(ctx, throughputSize, concurrent, duration, autotune)
if <-endBlankRepliesCh != nil {
return
}
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if err := json.NewEncoder(w).Encode(result); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
objectAPI.DeleteBucket(ctx, pathJoin(minioMetaSpeedTestBucket, minioMetaSpeedTestBucketPrefix), DeleteBucketOptions{Force: true, NoRecreate: true})
}
// Admin API errors

View File

@@ -973,99 +973,111 @@ func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogO
}
// Get the max throughput and iops numbers.
func speedTest(ctx context.Context, throughputSize, concurrencyStart int, duration time.Duration, autotune bool) (madmin.SpeedTestResult, error) {
var result madmin.SpeedTestResult
func speedTest(ctx context.Context, throughputSize, concurrencyStart int, duration time.Duration, autotune bool) chan madmin.SpeedTestResult {
ch := make(chan madmin.SpeedTestResult, 1)
go func() {
defer close(ch)
objAPI := newObjectLayerFn()
if objAPI == nil {
return result, errServerNotInitialized
}
concurrency := concurrencyStart
throughputHighestGet := uint64(0)
throughputHighestPut := uint64(0)
var throughputHighestGetResults []SpeedtestResult
for {
select {
case <-ctx.Done():
// If the client got disconnected stop the speedtest.
return result, errUnexpected
default:
objAPI := newObjectLayerFn()
if objAPI == nil {
return
}
results := globalNotificationSys.Speedtest(ctx, throughputSize, concurrency, duration)
sort.Slice(results, func(i, j int) bool {
return results[i].Endpoint < results[j].Endpoint
})
totalPut := uint64(0)
totalGet := uint64(0)
for _, result := range results {
totalPut += result.Uploads
totalGet += result.Downloads
concurrency := concurrencyStart
throughputHighestGet := uint64(0)
throughputHighestPut := uint64(0)
var throughputHighestGetResults []SpeedtestResult
sendResult := func() {
var result madmin.SpeedTestResult
durationSecs := duration.Seconds()
result.GETStats.ThroughputPerSec = throughputHighestGet / uint64(durationSecs)
result.GETStats.ObjectsPerSec = throughputHighestGet / uint64(throughputSize) / uint64(durationSecs)
result.PUTStats.ThroughputPerSec = throughputHighestPut / uint64(durationSecs)
result.PUTStats.ObjectsPerSec = throughputHighestPut / uint64(throughputSize) / uint64(durationSecs)
for i := 0; i < len(throughputHighestGetResults); i++ {
errStr := ""
if throughputHighestGetResults[i].Error != "" {
errStr = throughputHighestGetResults[i].Error
}
result.PUTStats.Servers = append(result.PUTStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestGetResults[i].Endpoint,
ThroughputPerSec: throughputHighestGetResults[i].Uploads / uint64(durationSecs),
ObjectsPerSec: throughputHighestGetResults[i].Uploads / uint64(throughputSize) / uint64(durationSecs),
Err: errStr,
})
result.GETStats.Servers = append(result.GETStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestGetResults[i].Endpoint,
ThroughputPerSec: throughputHighestGetResults[i].Downloads / uint64(durationSecs),
ObjectsPerSec: throughputHighestGetResults[i].Downloads / uint64(throughputSize) / uint64(durationSecs),
Err: errStr,
})
}
numDisks := 0
if pools, ok := objAPI.(*erasureServerPools); ok {
for _, set := range pools.serverPools {
numDisks = set.setCount * set.setDriveCount
}
}
result.Disks = numDisks
result.Servers = len(globalNotificationSys.peerClients) + 1
result.Version = Version
result.Size = throughputSize
result.Concurrent = concurrency
ch <- result
}
if totalGet < throughputHighestGet {
break
}
for {
select {
case <-ctx.Done():
// If the client got disconnected stop the speedtest.
return
default:
}
doBreak := false
if float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025 {
doBreak = true
}
results := globalNotificationSys.Speedtest(ctx, throughputSize, concurrency, duration)
sort.Slice(results, func(i, j int) bool {
return results[i].Endpoint < results[j].Endpoint
})
totalPut := uint64(0)
totalGet := uint64(0)
for _, result := range results {
totalPut += result.Uploads
totalGet += result.Downloads
}
if totalGet < throughputHighestGet {
sendResult()
break
}
doBreak := false
if float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025 {
doBreak = true
}
if totalGet > throughputHighestGet {
throughputHighestGet = totalGet
throughputHighestGetResults = results
throughputHighestPut = totalPut
if doBreak {
sendResult()
break
}
if !autotune {
sendResult()
break
}
sendResult()
// Try with a higher concurrency to see if we get better throughput
concurrency += (concurrency + 1) / 2
}
if doBreak {
break
}
if !autotune {
break
}
// Try with a higher concurrency to see if we get better throughput
concurrency += (concurrency + 1) / 2
}
durationSecs := duration.Seconds()
result.GETStats.ThroughputPerSec = throughputHighestGet / uint64(durationSecs)
result.GETStats.ObjectsPerSec = throughputHighestGet / uint64(throughputSize) / uint64(durationSecs)
result.PUTStats.ThroughputPerSec = throughputHighestPut / uint64(durationSecs)
result.PUTStats.ObjectsPerSec = throughputHighestPut / uint64(throughputSize) / uint64(durationSecs)
for i := 0; i < len(throughputHighestGetResults); i++ {
errStr := ""
if throughputHighestGetResults[i].Error != "" {
errStr = throughputHighestGetResults[i].Error
}
result.PUTStats.Servers = append(result.PUTStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestGetResults[i].Endpoint,
ThroughputPerSec: throughputHighestGetResults[i].Uploads / uint64(durationSecs),
ObjectsPerSec: throughputHighestGetResults[i].Uploads / uint64(throughputSize) / uint64(durationSecs),
Err: errStr,
})
result.GETStats.Servers = append(result.GETStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestGetResults[i].Endpoint,
ThroughputPerSec: throughputHighestGetResults[i].Downloads / uint64(durationSecs),
ObjectsPerSec: throughputHighestGetResults[i].Downloads / uint64(throughputSize) / uint64(durationSecs),
Err: errStr,
})
}
numDisks := 0
if pools, ok := objAPI.(*erasureServerPools); ok {
for _, set := range pools.serverPools {
numDisks = set.setCount * set.setDriveCount
}
}
result.Disks = numDisks
result.Servers = len(globalNotificationSys.peerClients) + 1
result.Version = Version
return result, nil
}()
return ch
}