Support speedtest autotune on the server side (#13086)

This commit is contained in:
Krishna Srinivas 2021-09-10 17:43:34 -07:00 committed by GitHub
parent 2807c11410
commit 03a2a74697
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 270 additions and 47 deletions

View File

@ -925,6 +925,12 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
sizeStr := r.Form.Get(peerRESTSize) sizeStr := r.Form.Get(peerRESTSize)
durationStr := r.Form.Get(peerRESTDuration) durationStr := r.Form.Get(peerRESTDuration)
concurrentStr := r.Form.Get(peerRESTConcurrent) concurrentStr := r.Form.Get(peerRESTConcurrent)
autotuneStr := r.Form.Get("autotune")
var autotune bool
if autotuneStr != "" {
autotune = true
}
size, err := strconv.Atoi(sizeStr) size, err := strconv.Atoi(sizeStr)
if err != nil { if err != nil {
@ -941,9 +947,47 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
duration = time.Second * 10 duration = time.Second * 10
} }
results := globalNotificationSys.Speedtest(ctx, size, concurrent, duration) throughputSize := size
iopsSize := size
if err := json.NewEncoder(w).Encode(results); err != nil { if autotune {
iopsSize = 4 * humanize.KiByte
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
endBlankRepliesCh := make(chan error)
go func() {
for {
select {
case <-ctx.Done():
endBlankRepliesCh <- nil
return
case <-keepAliveTicker.C:
// Write a blank entry to prevent client from disconnecting
if err := json.NewEncoder(w).Encode(madmin.SpeedTestResult{}); err != nil {
endBlankRepliesCh <- err
return
}
w.(http.Flusher).Flush()
case endBlankRepliesCh <- nil:
return
}
}
}()
result, err := speedTest(ctx, throughputSize, iopsSize, concurrent, duration, autotune)
if <-endBlankRepliesCh != nil {
return
}
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if err := json.NewEncoder(w).Encode(result); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return return
} }

View File

@ -1505,8 +1505,13 @@ func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) chan Metric {
// Speedtest run GET/PUT tests at input concurrency for requested object size, // Speedtest run GET/PUT tests at input concurrency for requested object size,
// optionally you can extend the tests longer with time.Duration. // optionally you can extend the tests longer with time.Duration.
func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent int, duration time.Duration) []madmin.SpeedtestResult { func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent int, duration time.Duration) []SpeedtestResult {
results := make([]madmin.SpeedtestResult, len(sys.allPeerClients)) length := len(sys.allPeerClients)
if length == 0 {
// For single node erasure setup.
length = 1
}
results := make([]SpeedtestResult, length)
scheme := "http" scheme := "http"
if globalIsTLS { if globalIsTLS {
@ -1526,12 +1531,12 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent
Scheme: scheme, Scheme: scheme,
Host: sys.peerClients[index].host.String(), Host: sys.peerClients[index].host.String(),
} }
results[index].Endpoint = u.String() if err != nil {
results[index].Err = err results[index].Error = err.Error()
if err == nil { } else {
results[index].Uploads = r.Uploads results[index] = r
results[index].Downloads = r.Downloads
} }
results[index].Endpoint = u.String()
}(index) }(index)
} }
@ -1543,12 +1548,12 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent
Scheme: scheme, Scheme: scheme,
Host: globalLocalNodeName, Host: globalLocalNodeName,
} }
results[len(results)-1].Endpoint = u.String() if err != nil {
results[len(results)-1].Err = err results[len(results)-1].Error = err.Error()
if err == nil { } else {
results[len(results)-1].Uploads = r.Uploads results[len(results)-1] = r
results[len(results)-1].Downloads = r.Downloads
} }
results[len(results)-1].Endpoint = u.String()
}() }()
wg.Wait() wg.Wait()

View File

@ -1156,6 +1156,7 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request)
// SpeedtestResult return value of the speedtest function // SpeedtestResult return value of the speedtest function
type SpeedtestResult struct { type SpeedtestResult struct {
Endpoint string
Uploads uint64 Uploads uint64
Downloads uint64 Downloads uint64
Error string Error string
@ -1163,8 +1164,9 @@ type SpeedtestResult struct {
// SpeedtestObject implements "random-read" object reader // SpeedtestObject implements "random-read" object reader
type SpeedtestObject struct { type SpeedtestObject struct {
buf []byte buf []byte
remaining int remaining int
totalBytesWritten *uint64
} }
func (bo *SpeedtestObject) Read(b []byte) (int, error) { func (bo *SpeedtestObject) Read(b []byte) (int, error) {
@ -1182,17 +1184,20 @@ func (bo *SpeedtestObject) Read(b []byte) (int, error) {
} }
copy(b, bo.buf) copy(b, bo.buf)
bo.remaining -= len(b) bo.remaining -= len(b)
atomic.AddUint64(bo.totalBytesWritten, uint64(len(b)))
return len(b), nil return len(b), nil
} }
// Runs the speedtest on local MinIO process. // Runs the speedtest on local MinIO process.
func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration) (SpeedtestResult, error) { func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration) (SpeedtestResult, error) {
var result SpeedtestResult
objAPI := newObjectLayerFn() objAPI := newObjectLayerFn()
if objAPI == nil { if objAPI == nil {
return result, errServerNotInitialized return SpeedtestResult{}, errServerNotInitialized
} }
var retError string
bucket := minioMetaSpeedTestBucket bucket := minioMetaSpeedTestBucket
buf := make([]byte, humanize.MiByte) buf := make([]byte, humanize.MiByte)
@ -1200,15 +1205,16 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
objCountPerThread := make([]uint64, concurrent) objCountPerThread := make([]uint64, concurrent)
var objUploadCount uint64 uploadsStopped := false
var objDownloadCount uint64 var totalBytesWritten uint64
uploadsCtx, uploadsCancel := context.WithCancel(context.Background())
var wg sync.WaitGroup var wg sync.WaitGroup
doneCh1 := make(chan struct{})
go func() { go func() {
time.Sleep(duration) time.Sleep(duration)
close(doneCh1) uploadsStopped = true
uploadsCancel()
}() }()
objNamePrefix := minioMetaSpeedTestBucketPrefix + uuid.New().String() objNamePrefix := minioMetaSpeedTestBucketPrefix + uuid.New().String()
@ -1218,35 +1224,37 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
for { for {
hashReader, err := hash.NewReader(&SpeedtestObject{buf, size}, hashReader, err := hash.NewReader(&SpeedtestObject{buf, size, &totalBytesWritten},
int64(size), "", "", int64(size)) int64(size), "", "", int64(size))
if err != nil { if err != nil {
retError = err.Error()
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
break break
} }
reader := NewPutObjReader(hashReader) reader := NewPutObjReader(hashReader)
_, err = objAPI.PutObject(ctx, bucket, fmt.Sprintf("%s.%d.%d", _, err = objAPI.PutObject(uploadsCtx, bucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{}) objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{})
if err != nil { if err != nil && !uploadsStopped {
retError = err.Error()
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
}
if err != nil {
break break
} }
objCountPerThread[i]++ objCountPerThread[i]++
atomic.AddUint64(&objUploadCount, 1)
select {
case <-doneCh1:
return
default:
}
} }
}(i) }(i)
} }
wg.Wait() wg.Wait()
doneCh2 := make(chan struct{}) downloadsStopped := false
var totalBytesRead uint64
downloadsCtx, downloadsCancel := context.WithCancel(context.Background())
go func() { go func() {
time.Sleep(duration) time.Sleep(duration)
close(doneCh2) downloadsStopped = true
downloadsCancel()
}() }()
wg.Add(concurrent) wg.Add(concurrent)
@ -1254,34 +1262,39 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
var j uint64 var j uint64
if objCountPerThread[i] == 0 {
return
}
for { for {
if objCountPerThread[i] == j { if objCountPerThread[i] == j {
j = 0 j = 0
} }
r, err := objAPI.GetObjectNInfo(ctx, bucket, fmt.Sprintf("%s.%d.%d", r, err := objAPI.GetObjectNInfo(downloadsCtx, bucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{})
if err != nil { if err != nil && !downloadsStopped {
retError = err.Error()
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
}
if err != nil {
break break
} }
_, err = io.Copy(ioutil.Discard, r) n, err := io.Copy(ioutil.Discard, r)
r.Close() r.Close()
if err != nil {
atomic.AddUint64(&totalBytesRead, uint64(n))
if err != nil && !downloadsStopped {
retError = err.Error()
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
}
if err != nil {
break break
} }
j++ j++
atomic.AddUint64(&objDownloadCount, 1)
select {
case <-doneCh2:
return
default:
}
} }
}(i) }(i)
} }
wg.Wait() wg.Wait()
return SpeedtestResult{Uploads: objUploadCount, Downloads: objDownloadCount}, nil return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil
} }
func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request) { func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request) {

View File

@ -37,6 +37,7 @@ import (
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"runtime/trace" "runtime/trace"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -970,3 +971,163 @@ func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogO
ctx = logger.SetAuditEntry(ctx, &entry) ctx = logger.SetAuditEntry(ctx, &entry)
logger.AuditLog(ctx, nil, nil, nil) logger.AuditLog(ctx, nil, nil, nil)
} }
// Get the max throughput and iops numbers.
func speedTest(ctx context.Context, throughputSize, iopsSize int, concurrencyStart int, duration time.Duration, autotune bool) (madmin.SpeedTestResult, error) {
var result madmin.SpeedTestResult
objAPI := newObjectLayerFn()
if objAPI == nil {
return result, errServerNotInitialized
}
concurrency := concurrencyStart
throughputHighestGet := uint64(0)
throughputHighestPut := uint64(0)
var throughputHighestPutResults []SpeedtestResult
var throughputHighestGetResults []SpeedtestResult
for {
select {
case <-ctx.Done():
// If the client got disconnected stop the speedtest.
return result, errUnexpected
default:
}
results := globalNotificationSys.Speedtest(ctx, throughputSize, concurrency, duration)
sort.Slice(results, func(i, j int) bool {
return results[i].Endpoint < results[j].Endpoint
})
totalPut := uint64(0)
totalGet := uint64(0)
for _, result := range results {
totalPut += result.Uploads
totalGet += result.Downloads
}
if totalPut < throughputHighestPut && totalGet < throughputHighestGet {
break
}
if totalPut > throughputHighestPut {
throughputHighestPut = totalPut
throughputHighestPutResults = results
}
if totalGet > throughputHighestGet {
throughputHighestGet = totalGet
throughputHighestGetResults = results
}
if !autotune {
break
}
// Try with a higher concurrency to see if we get better throughput
concurrency += (concurrency + 1) / 2
}
concurrency = concurrencyStart
iopsHighestPut := uint64(0)
iopsHighestGet := uint64(0)
var iopsHighestPutResults []SpeedtestResult
var iopsHighestGetResults []SpeedtestResult
if autotune {
for {
select {
case <-ctx.Done():
// If the client got disconnected stop the speedtest.
return result, errUnexpected
default:
}
results := globalNotificationSys.Speedtest(ctx, iopsSize, concurrency, duration)
sort.Slice(results, func(i, j int) bool {
return results[i].Endpoint < results[j].Endpoint
})
totalPut := uint64(0)
totalGet := uint64(0)
for _, result := range results {
totalPut += result.Uploads
totalGet += result.Downloads
}
if totalPut < iopsHighestPut && totalGet < iopsHighestGet {
break
}
if totalPut > iopsHighestPut {
iopsHighestPut = totalPut
iopsHighestPutResults = results
}
if totalGet > iopsHighestGet {
iopsHighestGet = totalGet
iopsHighestGetResults = results
}
if !autotune {
break
}
// Try with a higher concurrency to see if we get better throughput
concurrency += (concurrency + 1) / 2
}
} else {
iopsHighestPut = throughputHighestPut
iopsHighestGet = throughputHighestGet
iopsHighestPutResults = throughputHighestPutResults
iopsHighestGetResults = throughputHighestGetResults
}
if len(throughputHighestPutResults) != len(iopsHighestPutResults) {
return result, errors.New("throughput and iops differ in number of nodes")
}
if len(throughputHighestGetResults) != len(iopsHighestGetResults) {
return result, errors.New("throughput and iops differ in number of nodes")
}
durationSecs := duration.Seconds()
result.PUTStats.ThroughputPerSec = throughputHighestPut / uint64(durationSecs)
result.PUTStats.ObjectsPerSec = iopsHighestPut / uint64(iopsSize) / uint64(durationSecs)
for i := 0; i < len(throughputHighestPutResults); i++ {
errStr := ""
if throughputHighestPutResults[i].Error != "" {
errStr = throughputHighestPutResults[i].Error
}
if iopsHighestPutResults[i].Error != "" {
errStr = iopsHighestPutResults[i].Error
}
result.PUTStats.Servers = append(result.PUTStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestPutResults[i].Endpoint,
ThroughputPerSec: throughputHighestPutResults[i].Uploads / uint64(durationSecs),
ObjectsPerSec: iopsHighestPutResults[i].Uploads / uint64(iopsSize) / uint64(durationSecs),
Err: errStr,
})
}
result.GETStats.ThroughputPerSec = throughputHighestGet / uint64(durationSecs)
result.GETStats.ObjectsPerSec = iopsHighestGet / uint64(iopsSize) / uint64(durationSecs)
for i := 0; i < len(throughputHighestGetResults); i++ {
errStr := ""
if throughputHighestGetResults[i].Error != "" {
errStr = throughputHighestGetResults[i].Error
}
if iopsHighestGetResults[i].Error != "" {
errStr = iopsHighestGetResults[i].Error
}
result.GETStats.Servers = append(result.GETStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestGetResults[i].Endpoint,
ThroughputPerSec: throughputHighestGetResults[i].Downloads / uint64(durationSecs),
ObjectsPerSec: iopsHighestGetResults[i].Downloads / uint64(iopsSize) / uint64(durationSecs),
Err: errStr,
})
}
numDisks := 0
if pools, ok := objAPI.(*erasureServerPools); ok {
for _, set := range pools.serverPools {
numDisks = set.setCount * set.setDriveCount
}
}
result.Disks = numDisks
result.Servers = len(globalNotificationSys.peerClients) + 1
result.Version = Version
return result, nil
}

2
go.mod
View File

@ -45,7 +45,7 @@ require (
github.com/minio/csvparser v1.0.0 github.com/minio/csvparser v1.0.0
github.com/minio/highwayhash v1.0.2 github.com/minio/highwayhash v1.0.2
github.com/minio/kes v0.14.0 github.com/minio/kes v0.14.0
github.com/minio/madmin-go v1.1.0 github.com/minio/madmin-go v1.1.5
github.com/minio/minio-go/v7 v7.0.14-0.20210908194250-617d530ffac5 github.com/minio/minio-go/v7 v7.0.14-0.20210908194250-617d530ffac5
github.com/minio/parquet-go v1.0.0 github.com/minio/parquet-go v1.0.0
github.com/minio/pkg v1.1.2 github.com/minio/pkg v1.1.2

4
go.sum
View File

@ -1025,8 +1025,8 @@ github.com/minio/kes v0.14.0 h1:plCGm4LwR++T1P1sXsJbyFRX54CE1WRuo9PAPj6MC3Q=
github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQcfA= github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQcfA=
github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs= github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs=
github.com/minio/madmin-go v1.0.17/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA= github.com/minio/madmin-go v1.0.17/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA=
github.com/minio/madmin-go v1.1.0 h1:Y7YOPQafIVBpW0kwYzlt7FGi0GV23dlr/u3jRRrB4cw= github.com/minio/madmin-go v1.1.5 h1:xfzHwQ/KeKDQZKLqllNSyexwOPM/tvc13UdCeVMzADY=
github.com/minio/madmin-go v1.1.0/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA= github.com/minio/madmin-go v1.1.5/go.mod h1:xIPJHUbyYhNDgeD9Wov5Fz5/p7DIW0u+q6Rs/+Xu2TM=
github.com/minio/mc v0.0.0-20210626002108-cebf3318546f h1:hyFvo5hSFw2K417YvDr/vAKlgCG69uTuhZW/5LNdL0U= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f h1:hyFvo5hSFw2K417YvDr/vAKlgCG69uTuhZW/5LNdL0U=
github.com/minio/mc v0.0.0-20210626002108-cebf3318546f/go.mod h1:tuaonkPjVApCXkbtKENHBtsqUf7YTV33qmFrC+Pgp5g= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f/go.mod h1:tuaonkPjVApCXkbtKENHBtsqUf7YTV33qmFrC+Pgp5g=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=