add support for speedtest drive (#14182)

This commit is contained in:
Sidhartha Mani
2022-02-01 22:38:05 -08:00
committed by GitHub
parent a4e1de93a7
commit d7df6bc738
10 changed files with 497 additions and 143 deletions

View File

@@ -36,6 +36,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/dustin/go-humanize"
@@ -933,12 +934,17 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *
}
}
// SpeedtestHandler - reports maximum speed of a cluster by performing PUT and
// SpeedtestHandler - Deprecated. See ObjectSpeedtestHandler
func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Request) {
a.ObjectSpeedtestHandler(w, r)
}
// ObjectSpeedtestHandler - reports maximum speed of a cluster by performing PUT and
// GET operations on the server, supports auto tuning by default by automatically
// increasing concurrency and stopping when we have reached the limits on the
// system.
func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "SpeedtestHandler")
func (a adminAPIHandlers) ObjectSpeedtestHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ObjectSpeedtestHandler")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
@@ -1018,6 +1024,97 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
}
}
// NetSpeedtestHandler - reports maximum network throughput
func (a adminAPIHandlers) NetSpeedtestHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "NetSpeedtestHandler")
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
}
// DriveSpeedtestHandler - reports throughput of drives available in the cluster
func (a adminAPIHandlers) DriveSpeedtestHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "DriveSpeedtestHandler")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealthInfoAdminAction)
if objectAPI == nil {
return
}
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
// Freeze all incoming S3 API calls before running speedtest.
globalNotificationSys.ServiceFreeze(ctx, true)
// unfreeze all incoming S3 API calls after speedtest.
defer globalNotificationSys.ServiceFreeze(ctx, false)
serial := r.Form.Get("serial") == "true"
blockSizeStr := r.Form.Get("blocksize")
fileSizeStr := r.Form.Get("filesize")
blockSize, err := strconv.ParseUint(blockSizeStr, 10, 64)
if err != nil {
blockSize = 4 * humanize.MiByte // default value
}
fileSize, err := strconv.ParseUint(fileSizeStr, 10, 64)
if err != nil {
fileSize = 1 * humanize.GiByte // default value
}
opts := madmin.DriveSpeedTestOpts{
Serial: serial,
BlockSize: blockSize,
FileSize: fileSize,
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := json.NewEncoder(w)
ch := globalNotificationSys.DriveSpeedTest(ctx, opts)
var wg sync.WaitGroup
wg.Add(1)
// local driveSpeedTest
go func() {
defer wg.Done()
enc.Encode(driveSpeedTest(ctx, opts))
if wf, ok := w.(http.Flusher); ok {
wf.Flush()
}
}()
for {
select {
case <-ctx.Done():
goto endloop
case <-keepAliveTicker.C:
// Write a blank entry to prevent client from disconnecting
if err := enc.Encode(madmin.DriveSpeedTestResult{}); err != nil {
goto endloop
}
w.(http.Flusher).Flush()
case result, ok := <-ch:
if !ok {
goto endloop
}
if err := enc.Encode(result); err != nil {
goto endloop
}
w.(http.Flusher).Flush()
}
}
endloop:
wg.Wait()
}
// Admin API errors
const (
AdminUpdateUnexpectedFailure = "XMinioAdminUpdateUnexpectedFailure"

View File

@@ -221,6 +221,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
}
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest").HandlerFunc(httpTraceHdrs(adminAPI.SpeedtestHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/object").HandlerFunc(httpTraceHdrs(adminAPI.ObjectSpeedtestHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/drive").HandlerFunc(httpTraceHdrs(adminAPI.DriveSpeedtestHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/net").HandlerFunc(httpTraceHdrs(adminAPI.NetSpeedtestHandler))
// HTTP Trace
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/trace").HandlerFunc(gz(http.HandlerFunc(adminAPI.TraceHandler)))

View File

@@ -1611,6 +1611,39 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int,
return results
}
// DriveSpeedTest - Drive performance information
func (sys *NotificationSys) DriveSpeedTest(ctx context.Context, opts madmin.DriveSpeedTestOpts) chan madmin.DriveSpeedTestResult {
ch := make(chan madmin.DriveSpeedTestResult)
var wg sync.WaitGroup
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
resp, err := client.DriveSpeedTest(ctx, opts)
if err != nil {
resp.Error = err.Error()
}
ch <- resp
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
}(client)
}
go func() {
wg.Wait()
close(ch)
}()
return ch
}
// ReloadSiteReplicationConfig - tells all peer minio nodes to reload the
// site-replication configuration.
func (sys *NotificationSys) ReloadSiteReplicationConfig(ctx context.Context) []error {

View File

@@ -1067,6 +1067,35 @@ func (client *peerRESTClient) Speedtest(ctx context.Context, size,
return result, nil
}
func (client *peerRESTClient) DriveSpeedTest(ctx context.Context, opts madmin.DriveSpeedTestOpts) (madmin.DriveSpeedTestResult, error) {
queryVals := make(url.Values)
if opts.Serial {
queryVals.Set("serial", "true")
}
queryVals.Set("blocksize", strconv.FormatUint(opts.BlockSize, 10))
queryVals.Set("filesize", strconv.FormatUint(opts.FileSize, 10))
respBody, err := client.callWithContext(ctx, peerRESTMethodDriveSpeedTest, queryVals, nil, -1)
if err != nil {
return madmin.DriveSpeedTestResult{}, err
}
defer http.DrainBody(respBody)
waitReader, err := waitForHTTPResponse(respBody)
if err != nil {
return madmin.DriveSpeedTestResult{}, err
}
var result madmin.DriveSpeedTestResult
err = gob.NewDecoder(waitReader).Decode(&result)
if err != nil {
return result, err
}
if result.Error != "" {
return result, errors.New(result.Error)
}
return result, nil
}
func (client *peerRESTClient) ReloadSiteReplicationConfig(ctx context.Context) error {
respBody, err := client.callWithContext(context.Background(), peerRESTMethodReloadSiteReplicationConfig, nil, nil, -1)
if err != nil {

View File

@@ -18,7 +18,7 @@
package cmd
const (
peerRESTVersion = "v19" // Add getlastdaytierstats
peerRESTVersion = "v20" // Add drivespeedtest
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@@ -66,6 +66,7 @@ const (
peerRESTMethodGetPeerMetrics = "/peermetrics"
peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig"
peerRESTMethodSpeedtest = "/speedtest"
peerRESTMethodDriveSpeedTest = "/drivespeedtest"
peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig"
peerRESTMethodReloadPoolMeta = "/reloadpoolmeta"
peerRESTMethodGetLastDayTierStats = "/getlastdaytierstats"

View File

@@ -1343,6 +1343,45 @@ func (s *peerRESTServer) GetLastDayTierStatsHandler(w http.ResponseWriter, r *ht
logger.LogIf(ctx, gob.NewEncoder(w).Encode(result))
}
func (s *peerRESTServer) DriveSpeedTestHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
serial := r.Form.Get("serial") == "true"
blockSizeStr := r.Form.Get("blocksize")
fileSizeStr := r.Form.Get("filesize")
blockSize, err := strconv.ParseUint(blockSizeStr, 10, 64)
if err != nil {
blockSize = 4 * humanize.MiByte // default value
}
fileSize, err := strconv.ParseUint(fileSizeStr, 10, 64)
if err != nil {
fileSize = 1 * humanize.GiByte // default value
}
opts := madmin.DriveSpeedTestOpts{
Serial: serial,
BlockSize: blockSize,
FileSize: fileSize,
}
done := keepHTTPResponseAlive(w)
result := driveSpeedTest(r.Context(), opts)
done(nil)
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result))
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
@@ -1389,6 +1428,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerMetrics).HandlerFunc(httpTraceHdrs(server.GetPeerMetrics))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveSpeedTest).HandlerFunc(httpTraceHdrs(server.DriveSpeedTestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(httpTraceHdrs(server.GetLastDayTierStatsHandler))

217
cmd/speedtest.go Normal file
View File

@@ -0,0 +1,217 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"os"
"path/filepath"
"sort"
"time"
"github.com/minio/dperf/pkg/dperf"
"github.com/minio/madmin-go"
)
var dperfUUID = mustGetUUID()
type speedTestOpts struct {
throughputSize int
concurrencyStart int
duration time.Duration
autotune bool
storageClass string
}
// Get the max throughput and iops numbers.
func objectSpeedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestResult {
ch := make(chan madmin.SpeedTestResult, 1)
go func() {
defer close(ch)
concurrency := opts.concurrencyStart
throughputHighestGet := uint64(0)
throughputHighestPut := uint64(0)
var throughputHighestResults []SpeedtestResult
sendResult := func() {
var result madmin.SpeedTestResult
durationSecs := opts.duration.Seconds()
result.GETStats.ThroughputPerSec = throughputHighestGet / uint64(durationSecs)
result.GETStats.ObjectsPerSec = throughputHighestGet / uint64(opts.throughputSize) / uint64(durationSecs)
result.PUTStats.ThroughputPerSec = throughputHighestPut / uint64(durationSecs)
result.PUTStats.ObjectsPerSec = throughputHighestPut / uint64(opts.throughputSize) / uint64(durationSecs)
for i := 0; i < len(throughputHighestResults); i++ {
errStr := ""
if throughputHighestResults[i].Error != "" {
errStr = throughputHighestResults[i].Error
}
result.PUTStats.Servers = append(result.PUTStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestResults[i].Endpoint,
ThroughputPerSec: throughputHighestResults[i].Uploads / uint64(durationSecs),
ObjectsPerSec: throughputHighestResults[i].Uploads / uint64(opts.throughputSize) / uint64(durationSecs),
Err: errStr,
})
result.GETStats.Servers = append(result.GETStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestResults[i].Endpoint,
ThroughputPerSec: throughputHighestResults[i].Downloads / uint64(durationSecs),
ObjectsPerSec: throughputHighestResults[i].Downloads / uint64(opts.throughputSize) / uint64(durationSecs),
Err: errStr,
})
}
result.Size = opts.throughputSize
result.Disks = globalEndpoints.NEndpoints()
result.Servers = len(globalNotificationSys.peerClients) + 1
result.Version = Version
result.Concurrent = concurrency
ch <- result
}
for {
select {
case <-ctx.Done():
// If the client got disconnected stop the speedtest.
return
default:
}
results := globalNotificationSys.Speedtest(ctx,
opts.throughputSize, concurrency,
opts.duration, opts.storageClass)
sort.Slice(results, func(i, j int) bool {
return results[i].Endpoint < results[j].Endpoint
})
totalPut := uint64(0)
totalGet := uint64(0)
for _, result := range results {
totalPut += result.Uploads
totalGet += result.Downloads
}
if totalGet < throughputHighestGet {
// Following check is for situations
// when Writes() scale higher than Reads()
// - practically speaking this never happens
// and should never happen - however it has
// been seen recently due to hardware issues
// causes Reads() to go slower than Writes().
//
// Send such results anyways as this shall
// expose a problem underneath.
if totalPut > throughputHighestPut {
throughputHighestResults = results
throughputHighestPut = totalPut
// let the client see lower value as well
throughputHighestGet = totalGet
}
sendResult()
break
}
doBreak := float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025
throughputHighestGet = totalGet
throughputHighestResults = results
throughputHighestPut = totalPut
if doBreak {
sendResult()
break
}
for _, result := range results {
if result.Error != "" {
// Break out on errors.
sendResult()
return
}
}
sendResult()
if !opts.autotune {
break
}
// Try with a higher concurrency to see if we get better throughput
concurrency += (concurrency + 1) / 2
}
}()
return ch
}
// speedTest - Deprecated. See objectSpeedTest
func speedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestResult {
return objectSpeedTest(ctx, opts)
}
func driveSpeedTest(ctx context.Context, opts madmin.DriveSpeedTestOpts) madmin.DriveSpeedTestResult {
perf := &dperf.DrivePerf{
Serial: opts.Serial,
BlockSize: opts.BlockSize,
FileSize: opts.FileSize,
}
paths := func() []string {
toRet := []string{}
paths := globalEndpoints.LocalDisksPaths()
for _, p := range paths {
dperfFilename := filepath.Join(p, ".minio.sys", "tmp", dperfUUID)
if _, err := os.Stat(dperfFilename); err == nil {
// file exists, so remove it
os.Remove(dperfFilename)
}
toRet = append(toRet, dperfFilename)
}
return toRet
}()
perfs, err := perf.Run(ctx, paths...)
return madmin.DriveSpeedTestResult{
Endpoint: globalLocalNodeName,
Version: Version,
DrivePerf: func() []madmin.DrivePerf {
results := []madmin.DrivePerf{}
for _, r := range perfs {
result := madmin.DrivePerf{
Path: r.Path,
ReadThroughput: r.ReadThroughput,
WriteThroughput: r.WriteThroughput,
Error: func() string {
if r.Error != nil {
return r.Error.Error()
}
return ""
}(),
}
results = append(results, result)
}
return results
}(),
Error: func() string {
if err != nil {
return err.Error()
}
return ""
}(),
}
}

View File

@@ -37,7 +37,6 @@ import (
"runtime"
"runtime/pprof"
"runtime/trace"
"sort"
"strings"
"sync"
"syscall"
@@ -997,136 +996,6 @@ func auditLogInternal(ctx context.Context, bucket, object string, opts AuditLogO
logger.AuditLog(ctx, nil, nil, nil)
}
type speedTestOpts struct {
throughputSize int
concurrencyStart int
duration time.Duration
autotune bool
storageClass string
}
// Get the max throughput and iops numbers.
func speedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestResult {
ch := make(chan madmin.SpeedTestResult, 1)
go func() {
defer close(ch)
concurrency := opts.concurrencyStart
throughputHighestGet := uint64(0)
throughputHighestPut := uint64(0)
var throughputHighestResults []SpeedtestResult
sendResult := func() {
var result madmin.SpeedTestResult
durationSecs := opts.duration.Seconds()
result.GETStats.ThroughputPerSec = throughputHighestGet / uint64(durationSecs)
result.GETStats.ObjectsPerSec = throughputHighestGet / uint64(opts.throughputSize) / uint64(durationSecs)
result.PUTStats.ThroughputPerSec = throughputHighestPut / uint64(durationSecs)
result.PUTStats.ObjectsPerSec = throughputHighestPut / uint64(opts.throughputSize) / uint64(durationSecs)
for i := 0; i < len(throughputHighestResults); i++ {
errStr := ""
if throughputHighestResults[i].Error != "" {
errStr = throughputHighestResults[i].Error
}
result.PUTStats.Servers = append(result.PUTStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestResults[i].Endpoint,
ThroughputPerSec: throughputHighestResults[i].Uploads / uint64(durationSecs),
ObjectsPerSec: throughputHighestResults[i].Uploads / uint64(opts.throughputSize) / uint64(durationSecs),
Err: errStr,
})
result.GETStats.Servers = append(result.GETStats.Servers, madmin.SpeedTestStatServer{
Endpoint: throughputHighestResults[i].Endpoint,
ThroughputPerSec: throughputHighestResults[i].Downloads / uint64(durationSecs),
ObjectsPerSec: throughputHighestResults[i].Downloads / uint64(opts.throughputSize) / uint64(durationSecs),
Err: errStr,
})
}
result.Size = opts.throughputSize
result.Disks = globalEndpoints.NEndpoints()
result.Servers = len(globalNotificationSys.peerClients) + 1
result.Version = Version
result.Concurrent = concurrency
ch <- result
}
for {
select {
case <-ctx.Done():
// If the client got disconnected stop the speedtest.
return
default:
}
results := globalNotificationSys.Speedtest(ctx,
opts.throughputSize, concurrency,
opts.duration, opts.storageClass)
sort.Slice(results, func(i, j int) bool {
return results[i].Endpoint < results[j].Endpoint
})
totalPut := uint64(0)
totalGet := uint64(0)
for _, result := range results {
totalPut += result.Uploads
totalGet += result.Downloads
}
if totalGet < throughputHighestGet {
// Following check is for situations
// when Writes() scale higher than Reads()
// - practically speaking this never happens
// and should never happen - however it has
// been seen recently due to hardware issues
// causes Reads() to go slower than Writes().
//
// Send such results anyways as this shall
// expose a problem underneath.
if totalPut > throughputHighestPut {
throughputHighestResults = results
throughputHighestPut = totalPut
// let the client see lower value as well
throughputHighestGet = totalGet
}
sendResult()
break
}
doBreak := float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025
throughputHighestGet = totalGet
throughputHighestResults = results
throughputHighestPut = totalPut
if doBreak {
sendResult()
break
}
for _, result := range results {
if result.Error != "" {
// Break out on errors.
sendResult()
return
}
}
sendResult()
if !opts.autotune {
break
}
// Try with a higher concurrency to see if we get better throughput
concurrency += (concurrency + 1) / 2
}
}()
return ch
}
func newTLSConfig(getCert certs.GetCertificateFunc) *tls.Config {
if getCert == nil {
return nil