From 03a2a746970fa96a8bc6706f83b6d08d607a5669 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Fri, 10 Sep 2021 17:43:34 -0700 Subject: [PATCH] Support speedtest autotune on the server side (#13086) --- cmd/admin-handlers.go | 48 +++++++++++- cmd/notification.go | 29 +++++--- cmd/peer-rest-server.go | 73 ++++++++++-------- cmd/utils.go | 161 ++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- 6 files changed, 270 insertions(+), 47 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 84d32b3a1..1d33d0bd3 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -925,6 +925,12 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques sizeStr := r.Form.Get(peerRESTSize) durationStr := r.Form.Get(peerRESTDuration) concurrentStr := r.Form.Get(peerRESTConcurrent) + autotuneStr := r.Form.Get("autotune") + + var autotune bool + if autotuneStr != "" { + autotune = true + } size, err := strconv.Atoi(sizeStr) if err != nil { @@ -941,9 +947,47 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques duration = time.Second * 10 } - results := globalNotificationSys.Speedtest(ctx, size, concurrent, duration) + throughputSize := size + iopsSize := size - if err := json.NewEncoder(w).Encode(results); err != nil { + if autotune { + iopsSize = 4 * humanize.KiByte + } + + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() + + endBlankRepliesCh := make(chan error) + + go func() { + for { + select { + case <-ctx.Done(): + endBlankRepliesCh <- nil + return + case <-keepAliveTicker.C: + // Write a blank entry to prevent client from disconnecting + if err := json.NewEncoder(w).Encode(madmin.SpeedTestResult{}); err != nil { + endBlankRepliesCh <- err + return + } + w.(http.Flusher).Flush() + case endBlankRepliesCh <- nil: + return + } + } + }() + + result, err := speedTest(ctx, throughputSize, iopsSize, concurrent, duration, autotune) + if <-endBlankRepliesCh != nil { + return + } + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + if err := json.NewEncoder(w).Encode(result); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } diff --git a/cmd/notification.go b/cmd/notification.go index 8367bdfc2..42cf9e31b 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1505,8 +1505,13 @@ func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) chan Metric { // Speedtest run GET/PUT tests at input concurrency for requested object size, // optionally you can extend the tests longer with time.Duration. -func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent int, duration time.Duration) []madmin.SpeedtestResult { - results := make([]madmin.SpeedtestResult, len(sys.allPeerClients)) +func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent int, duration time.Duration) []SpeedtestResult { + length := len(sys.allPeerClients) + if length == 0 { + // For single node erasure setup. + length = 1 + } + results := make([]SpeedtestResult, length) scheme := "http" if globalIsTLS { @@ -1526,12 +1531,12 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent Scheme: scheme, Host: sys.peerClients[index].host.String(), } - results[index].Endpoint = u.String() - results[index].Err = err - if err == nil { - results[index].Uploads = r.Uploads - results[index].Downloads = r.Downloads + if err != nil { + results[index].Error = err.Error() + } else { + results[index] = r } + results[index].Endpoint = u.String() }(index) } @@ -1543,12 +1548,12 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent Scheme: scheme, Host: globalLocalNodeName, } - results[len(results)-1].Endpoint = u.String() - results[len(results)-1].Err = err - if err == nil { - results[len(results)-1].Uploads = r.Uploads - results[len(results)-1].Downloads = r.Downloads + if err != nil { + results[len(results)-1].Error = err.Error() + } else { + results[len(results)-1] = r } + results[len(results)-1].Endpoint = u.String() }() wg.Wait() diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index ba6a7654f..536533767 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1156,6 +1156,7 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request) // SpeedtestResult return value of the speedtest function type SpeedtestResult struct { + Endpoint string Uploads uint64 Downloads uint64 Error string @@ -1163,8 +1164,9 @@ type SpeedtestResult struct { // SpeedtestObject implements "random-read" object reader type SpeedtestObject struct { - buf []byte - remaining int + buf []byte + remaining int + totalBytesWritten *uint64 } func (bo *SpeedtestObject) Read(b []byte) (int, error) { @@ -1182,17 +1184,20 @@ func (bo *SpeedtestObject) Read(b []byte) (int, error) { } copy(b, bo.buf) bo.remaining -= len(b) + + atomic.AddUint64(bo.totalBytesWritten, uint64(len(b))) return len(b), nil } // Runs the speedtest on local MinIO process. func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration) (SpeedtestResult, error) { - var result SpeedtestResult objAPI := newObjectLayerFn() if objAPI == nil { - return result, errServerNotInitialized + return SpeedtestResult{}, errServerNotInitialized } + var retError string + bucket := minioMetaSpeedTestBucket buf := make([]byte, humanize.MiByte) @@ -1200,15 +1205,16 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura objCountPerThread := make([]uint64, concurrent) - var objUploadCount uint64 - var objDownloadCount uint64 + uploadsStopped := false + var totalBytesWritten uint64 + uploadsCtx, uploadsCancel := context.WithCancel(context.Background()) var wg sync.WaitGroup - doneCh1 := make(chan struct{}) go func() { time.Sleep(duration) - close(doneCh1) + uploadsStopped = true + uploadsCancel() }() objNamePrefix := minioMetaSpeedTestBucketPrefix + uuid.New().String() @@ -1218,35 +1224,37 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura go func(i int) { defer wg.Done() for { - hashReader, err := hash.NewReader(&SpeedtestObject{buf, size}, + hashReader, err := hash.NewReader(&SpeedtestObject{buf, size, &totalBytesWritten}, int64(size), "", "", int64(size)) if err != nil { + retError = err.Error() logger.LogIf(ctx, err) break } reader := NewPutObjReader(hashReader) - _, err = objAPI.PutObject(ctx, bucket, fmt.Sprintf("%s.%d.%d", + _, err = objAPI.PutObject(uploadsCtx, bucket, fmt.Sprintf("%s.%d.%d", objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{}) - if err != nil { + if err != nil && !uploadsStopped { + retError = err.Error() logger.LogIf(ctx, err) + } + if err != nil { break } objCountPerThread[i]++ - atomic.AddUint64(&objUploadCount, 1) - select { - case <-doneCh1: - return - default: - } } }(i) } wg.Wait() - doneCh2 := make(chan struct{}) + downloadsStopped := false + var totalBytesRead uint64 + downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) + go func() { time.Sleep(duration) - close(doneCh2) + downloadsStopped = true + downloadsCancel() }() wg.Add(concurrent) @@ -1254,34 +1262,39 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura go func(i int) { defer wg.Done() var j uint64 + if objCountPerThread[i] == 0 { + return + } for { if objCountPerThread[i] == j { j = 0 } - r, err := objAPI.GetObjectNInfo(ctx, bucket, fmt.Sprintf("%s.%d.%d", + r, err := objAPI.GetObjectNInfo(downloadsCtx, bucket, fmt.Sprintf("%s.%d.%d", objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) - if err != nil { + if err != nil && !downloadsStopped { + retError = err.Error() logger.LogIf(ctx, err) + } + if err != nil { break } - _, err = io.Copy(ioutil.Discard, r) + n, err := io.Copy(ioutil.Discard, r) r.Close() - if err != nil { + + atomic.AddUint64(&totalBytesRead, uint64(n)) + if err != nil && !downloadsStopped { + retError = err.Error() logger.LogIf(ctx, err) + } + if err != nil { break } j++ - atomic.AddUint64(&objDownloadCount, 1) - select { - case <-doneCh2: - return - default: - } } }(i) } wg.Wait() - return SpeedtestResult{Uploads: objUploadCount, Downloads: objDownloadCount}, nil + return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil } func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request) { diff --git a/cmd/utils.go b/cmd/utils.go index 875a6efa6..d5a44c2b2 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -37,6 +37,7 @@ import ( "runtime" "runtime/pprof" "runtime/trace" + "sort" "strings" "sync" "time" @@ -970,3 +971,163 @@ func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogO ctx = logger.SetAuditEntry(ctx, &entry) logger.AuditLog(ctx, nil, nil, nil) } + +// Get the max throughput and iops numbers. +func speedTest(ctx context.Context, throughputSize, iopsSize int, concurrencyStart int, duration time.Duration, autotune bool) (madmin.SpeedTestResult, error) { + var result madmin.SpeedTestResult + + objAPI := newObjectLayerFn() + if objAPI == nil { + return result, errServerNotInitialized + } + + concurrency := concurrencyStart + + throughputHighestGet := uint64(0) + throughputHighestPut := uint64(0) + var throughputHighestPutResults []SpeedtestResult + var throughputHighestGetResults []SpeedtestResult + + for { + select { + case <-ctx.Done(): + // If the client got disconnected stop the speedtest. + return result, errUnexpected + default: + } + + results := globalNotificationSys.Speedtest(ctx, throughputSize, concurrency, duration) + sort.Slice(results, func(i, j int) bool { + return results[i].Endpoint < results[j].Endpoint + }) + totalPut := uint64(0) + totalGet := uint64(0) + for _, result := range results { + totalPut += result.Uploads + totalGet += result.Downloads + } + if totalPut < throughputHighestPut && totalGet < throughputHighestGet { + break + } + + if totalPut > throughputHighestPut { + throughputHighestPut = totalPut + throughputHighestPutResults = results + } + if totalGet > throughputHighestGet { + throughputHighestGet = totalGet + throughputHighestGetResults = results + } + if !autotune { + break + } + // Try with a higher concurrency to see if we get better throughput + concurrency += (concurrency + 1) / 2 + } + + concurrency = concurrencyStart + iopsHighestPut := uint64(0) + iopsHighestGet := uint64(0) + var iopsHighestPutResults []SpeedtestResult + var iopsHighestGetResults []SpeedtestResult + + if autotune { + for { + select { + case <-ctx.Done(): + // If the client got disconnected stop the speedtest. + return result, errUnexpected + default: + } + results := globalNotificationSys.Speedtest(ctx, iopsSize, concurrency, duration) + sort.Slice(results, func(i, j int) bool { + return results[i].Endpoint < results[j].Endpoint + }) + totalPut := uint64(0) + totalGet := uint64(0) + for _, result := range results { + totalPut += result.Uploads + totalGet += result.Downloads + } + if totalPut < iopsHighestPut && totalGet < iopsHighestGet { + break + } + if totalPut > iopsHighestPut { + iopsHighestPut = totalPut + iopsHighestPutResults = results + } + if totalGet > iopsHighestGet { + iopsHighestGet = totalGet + iopsHighestGetResults = results + } + if !autotune { + break + } + // Try with a higher concurrency to see if we get better throughput + concurrency += (concurrency + 1) / 2 + } + } else { + iopsHighestPut = throughputHighestPut + iopsHighestGet = throughputHighestGet + iopsHighestPutResults = throughputHighestPutResults + iopsHighestGetResults = throughputHighestGetResults + } + + if len(throughputHighestPutResults) != len(iopsHighestPutResults) { + return result, errors.New("throughput and iops differ in number of nodes") + } + + if len(throughputHighestGetResults) != len(iopsHighestGetResults) { + return result, errors.New("throughput and iops differ in number of nodes") + } + + durationSecs := duration.Seconds() + + result.PUTStats.ThroughputPerSec = throughputHighestPut / uint64(durationSecs) + result.PUTStats.ObjectsPerSec = iopsHighestPut / uint64(iopsSize) / uint64(durationSecs) + for i := 0; i < len(throughputHighestPutResults); i++ { + errStr := "" + if throughputHighestPutResults[i].Error != "" { + errStr = throughputHighestPutResults[i].Error + } + if iopsHighestPutResults[i].Error != "" { + errStr = iopsHighestPutResults[i].Error + } + result.PUTStats.Servers = append(result.PUTStats.Servers, madmin.SpeedTestStatServer{ + Endpoint: throughputHighestPutResults[i].Endpoint, + ThroughputPerSec: throughputHighestPutResults[i].Uploads / uint64(durationSecs), + ObjectsPerSec: iopsHighestPutResults[i].Uploads / uint64(iopsSize) / uint64(durationSecs), + Err: errStr, + }) + } + + result.GETStats.ThroughputPerSec = throughputHighestGet / uint64(durationSecs) + result.GETStats.ObjectsPerSec = iopsHighestGet / uint64(iopsSize) / uint64(durationSecs) + for i := 0; i < len(throughputHighestGetResults); i++ { + errStr := "" + if throughputHighestGetResults[i].Error != "" { + errStr = throughputHighestGetResults[i].Error + } + if iopsHighestGetResults[i].Error != "" { + errStr = iopsHighestGetResults[i].Error + } + result.GETStats.Servers = append(result.GETStats.Servers, madmin.SpeedTestStatServer{ + Endpoint: throughputHighestGetResults[i].Endpoint, + ThroughputPerSec: throughputHighestGetResults[i].Downloads / uint64(durationSecs), + ObjectsPerSec: iopsHighestGetResults[i].Downloads / uint64(iopsSize) / uint64(durationSecs), + Err: errStr, + }) + } + + numDisks := 0 + if pools, ok := objAPI.(*erasureServerPools); ok { + for _, set := range pools.serverPools { + numDisks = set.setCount * set.setDriveCount + } + } + result.Disks = numDisks + result.Servers = len(globalNotificationSys.peerClients) + 1 + result.Version = Version + + return result, nil +} diff --git a/go.mod b/go.mod index 5805372cd..317152a8e 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/minio/csvparser v1.0.0 github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.14.0 - github.com/minio/madmin-go v1.1.0 + github.com/minio/madmin-go v1.1.5 github.com/minio/minio-go/v7 v7.0.14-0.20210908194250-617d530ffac5 github.com/minio/parquet-go v1.0.0 github.com/minio/pkg v1.1.2 diff --git a/go.sum b/go.sum index 6ebdf26d2..ed199bc2e 100644 --- a/go.sum +++ b/go.sum @@ -1025,8 +1025,8 @@ github.com/minio/kes v0.14.0 h1:plCGm4LwR++T1P1sXsJbyFRX54CE1WRuo9PAPj6MC3Q= github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQcfA= github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs= github.com/minio/madmin-go v1.0.17/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA= -github.com/minio/madmin-go v1.1.0 h1:Y7YOPQafIVBpW0kwYzlt7FGi0GV23dlr/u3jRRrB4cw= -github.com/minio/madmin-go v1.1.0/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA= +github.com/minio/madmin-go v1.1.5 h1:xfzHwQ/KeKDQZKLqllNSyexwOPM/tvc13UdCeVMzADY= +github.com/minio/madmin-go v1.1.5/go.mod h1:xIPJHUbyYhNDgeD9Wov5Fz5/p7DIW0u+q6Rs/+Xu2TM= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f h1:hyFvo5hSFw2K417YvDr/vAKlgCG69uTuhZW/5LNdL0U= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f/go.mod h1:tuaonkPjVApCXkbtKENHBtsqUf7YTV33qmFrC+Pgp5g= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=