From aa0c28809bbadaeab30c2010c319eff5eef0a332 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Tue, 27 Jul 2021 12:55:56 -0700 Subject: [PATCH] Server side speedtest implementation (#12750) --- cmd/admin-handlers.go | 47 +++++++++++ cmd/admin-router.go | 2 + cmd/notification.go | 39 +++++++++ cmd/object-api-utils.go | 4 +- cmd/peer-rest-client.go | 18 ++++ cmd/peer-rest-common.go | 4 + cmd/peer-rest-server.go | 180 ++++++++++++++++++++++++++++++++++++++++ cmd/xl-storage.go | 2 +- go.mod | 2 +- go.sum | 3 +- 10 files changed, 297 insertions(+), 4 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 7fb9e96e9..9f7666907 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -39,6 +39,7 @@ import ( "strings" "time" + humanize "github.com/dustin/go-humanize" "github.com/gorilla/mux" "github.com/klauspost/compress/zip" "github.com/minio/kes" @@ -910,6 +911,52 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * } } +func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SpeedtestHandler") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction) + if objectAPI == nil { + return + } + + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + sizeStr := r.URL.Query().Get(peerRESTSize) + durationStr := r.URL.Query().Get(peerRESTDuration) + concurrentStr := r.URL.Query().Get(peerRESTConcurrent) + + size, err := strconv.Atoi(sizeStr) + if err != nil { + size = 64 * humanize.MiByte + } + + concurrent, err := strconv.Atoi(concurrentStr) + if err != nil { + concurrent = 32 + } + + duration, err := time.ParseDuration(durationStr) + if err != nil { + duration = time.Second * 10 + } + + results := globalNotificationSys.Speedtest(ctx, size, concurrent, duration) + + if err := json.NewEncoder(w).Encode(results); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + objectAPI.DeleteBucket(ctx, pathJoin(minioMetaSpeedTestBucket, minioMetaSpeedTestBucketPrefix), true) + + w.(http.Flusher).Flush() +} + func validateAdminReq(ctx context.Context, w http.ResponseWriter, r *http.Request, action iampolicy.AdminAction) (ObjectLayer, auth.Credentials) { var cred auth.Credentials var adminAPIErr APIErrorCode diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 0496e3119..095720919 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -204,6 +204,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { Queries("paths", "{paths:.*}").HandlerFunc(gz(httpTraceHdrs(adminAPI.ForceUnlockHandler))) } + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest").HandlerFunc(httpTraceHdrs(adminAPI.SpeedtestHandler)) + // HTTP Trace adminRouter.Methods(http.MethodGet).Path(adminVersion + "/trace").HandlerFunc(gz(adminAPI.TraceHandler)) diff --git a/cmd/notification.go b/cmd/notification.go index 00d9b4135..1b3157021 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1427,3 +1427,42 @@ func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) chan Metric { }(&wg, ch) return ch } + +// Speedtest run GET/PUT tests at input concurrency for requested object size, +// optionally you can extend the tests longer with time.Duration. +func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent int, duration time.Duration) []madmin.SpeedtestResult { + results := make([]madmin.SpeedtestResult, len(sys.peerClients)+1) + + var wg sync.WaitGroup + for index := range sys.peerClients { + if sys.peerClients[index] == nil { + continue + } + wg.Add(1) + go func(index int) { + defer wg.Done() + r, err := sys.peerClients[index].Speedtest(ctx, size, concurrent, duration) + results[index].Endpoint = sys.peerClients[index].String() + results[index].Err = err + if err == nil { + results[index].Uploads = r.Uploads + results[index].Downloads = r.Downloads + } + }(index) + } + + wg.Add(1) + go func() { + defer wg.Done() + r, err := selfSpeedtest(ctx, size, concurrent, duration) + results[len(results)-1].Endpoint = globalMinioEndpoint + results[len(results)-1].Err = err + if err == nil { + results[len(results)-1].Uploads = r.Uploads + results[len(results)-1].Downloads = r.Downloads + } + }() + wg.Wait() + + return results +} diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 4929a0f54..a71cea3bf 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -61,7 +61,9 @@ const ( // MinIO tmp meta prefix. minioMetaTmpBucket = minioMetaBucket + "/tmp" // MinIO tmp meta prefix for deleted objects. - minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash" + minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash" + minioMetaSpeedTestBucket = minioMetaBucket + "/speedtest" + minioMetaSpeedTestBucketPrefix = "objects/" // DNS separator (period), used for bucket name validation. dnsDelimiter = "." diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index f33a1a829..ff794a319 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -969,3 +969,21 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric }(ch) return ch, nil } + +func (client *peerRESTClient) Speedtest(ctx context.Context, size, concurrent int, duration time.Duration) (SpeedtestResult, error) { + values := make(url.Values) + values.Set(peerRESTSize, strconv.Itoa(size)) + values.Set(peerRESTConcurrent, strconv.Itoa(concurrent)) + values.Set(peerRESTDuration, duration.String()) + + respBody, err := client.callWithContext(context.Background(), peerRESTMethodSpeedtest, values, nil, -1) + if err != nil { + return SpeedtestResult{}, err + } + defer http.DrainBody(respBody) + + dec := gob.NewDecoder(respBody) + var result SpeedtestResult + err = dec.Decode(&result) + return result, err +} diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 5f0290b10..7315ae2cb 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -62,6 +62,7 @@ const ( peerRESTMethodUpdateMetacacheListing = "/updatemetacache" peerRESTMethodGetPeerMetrics = "/peermetrics" peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig" + peerRESTMethodSpeedtest = "/speedtest" ) const ( @@ -81,6 +82,9 @@ const ( peerRESTTraceS3 = "s3" peerRESTTraceOS = "os" peerRESTTraceThreshold = "threshold" + peerRESTSize = "size" + peerRESTConcurrent = "concurrent" + peerRESTDuration = "duration" peerRESTListenBucket = "bucket" peerRESTListenPrefix = "prefix" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index f75a4ad39..0bdc0a29d 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "crypto/rand" "encoding/gob" "errors" "fmt" @@ -27,12 +28,17 @@ import ( "net/http" "strconv" "strings" + "sync" + "sync/atomic" "time" + humanize "github.com/dustin/go-humanize" + "github.com/google/uuid" "github.com/gorilla/mux" "github.com/minio/madmin-go" b "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/event" + "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/logger" "github.com/tinylib/msgp/msgp" ) @@ -1097,6 +1103,179 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request) w.(http.Flusher).Flush() } +// SpeedtestResult return value of the speedtest function +type SpeedtestResult struct { + Uploads uint64 + Downloads uint64 +} + +// SpeedtestObject implements "random-read" object reader +type SpeedtestObject struct { + buf []byte + remaining int +} + +func (bo *SpeedtestObject) Read(b []byte) (int, error) { + if bo.remaining == 0 { + return 0, io.EOF + } + if len(b) == 0 { + return 0, nil + } + if len(b) > len(bo.buf) { + b = b[:len(bo.buf)] + } + if len(b) > bo.remaining { + b = b[:bo.remaining] + } + copy(b, bo.buf) + bo.remaining -= len(b) + return len(b), nil +} + +// Runs the speedtest on local MinIO process. +func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration) (SpeedtestResult, error) { + var result SpeedtestResult + objAPI := newObjectLayerFn() + if objAPI == nil { + return result, errServerNotInitialized + } + + bucket := minioMetaSpeedTestBucket + + buf := make([]byte, humanize.MiByte) + rand.Read(buf) + + objCountPerThread := make([]uint64, concurrent) + + var objUploadCount uint64 + var objDownloadCount uint64 + + var wg sync.WaitGroup + + doneCh1 := make(chan struct{}) + go func() { + time.Sleep(duration) + close(doneCh1) + }() + + objNamePrefix := minioMetaSpeedTestBucketPrefix + uuid.New().String() + + wg.Add(concurrent) + for i := 0; i < concurrent; i++ { + go func(i int) { + defer wg.Done() + for { + hashReader, err := hash.NewReader(&SpeedtestObject{buf, size}, + int64(size), "", "", int64(size)) + if err != nil { + logger.LogIf(ctx, err) + break + } + reader := NewPutObjReader(hashReader) + _, err = objAPI.PutObject(ctx, bucket, fmt.Sprintf("%s.%d.%d", + objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{}) + if err != nil { + logger.LogIf(ctx, err) + break + } + objCountPerThread[i]++ + atomic.AddUint64(&objUploadCount, 1) + select { + case <-doneCh1: + return + default: + } + } + }(i) + } + wg.Wait() + + doneCh2 := make(chan struct{}) + go func() { + time.Sleep(duration) + close(doneCh2) + }() + + wg.Add(concurrent) + for i := 0; i < concurrent; i++ { + go func(i int) { + defer wg.Done() + var j uint64 + for { + if objCountPerThread[i] == j { + j = 0 + } + r, err := objAPI.GetObjectNInfo(ctx, bucket, fmt.Sprintf("%s.%d.%d", + objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) + if err != nil { + logger.LogIf(ctx, err) + break + } + _, err = io.Copy(ioutil.Discard, r) + r.Close() + if err != nil { + logger.LogIf(ctx, err) + break + } + j++ + atomic.AddUint64(&objDownloadCount, 1) + select { + case <-doneCh2: + return + default: + } + } + }(i) + } + wg.Wait() + return SpeedtestResult{objUploadCount, objDownloadCount}, nil +} + +func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("invalid request")) + } + + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return + } + + sizeStr := r.URL.Query().Get(peerRESTSize) + durationStr := r.URL.Query().Get(peerRESTDuration) + concurrentStr := r.URL.Query().Get(peerRESTConcurrent) + + size, err := strconv.Atoi(sizeStr) + if err != nil { + size = 64 * humanize.MiByte + } + + concurrent, err := strconv.Atoi(concurrentStr) + if err != nil { + concurrent = 32 + } + + duration, err := time.ParseDuration(durationStr) + if err != nil { + duration = time.Second * 10 + } + + result, err := selfSpeedtest(r.Context(), size, concurrent, duration) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + enc := gob.NewEncoder(w) + if err := enc.Encode(result); err != nil { + s.writeErrorResponse(w, errors.New("Encoding report failed: "+err.Error())) + return + } + w.(http.Flusher).Flush() +} + // registerPeerRESTHandlers - register peer rest router. func registerPeerRESTHandlers(router *mux.Router) { server := &peerRESTServer{} @@ -1139,4 +1318,5 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodUpdateMetacacheListing).HandlerFunc(httpTraceHdrs(server.UpdateMetacacheListingHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerMetrics).HandlerFunc(httpTraceHdrs(server.GetPeerMetrics)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler)) } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 1a9a9b2fe..34ee2af07 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -292,7 +292,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) { } // Create all necessary bucket folders if possible. - if err = p.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket); err != nil { + if err = p.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket, minioMetaSpeedTestBucket); err != nil { return nil, err } diff --git a/go.mod b/go.mod index 337eda20f..40071f7e7 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/minio/csvparser v1.0.0 github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.14.0 - github.com/minio/madmin-go v1.0.17 + github.com/minio/madmin-go v1.0.19 github.com/minio/minio-go/v7 v7.0.13-0.20210715203016-9e713532886e github.com/minio/parquet-go v1.0.0 github.com/minio/pkg v1.0.10 diff --git a/go.sum b/go.sum index cf0462ab1..98b933f92 100644 --- a/go.sum +++ b/go.sum @@ -1023,8 +1023,9 @@ github.com/minio/kes v0.14.0 h1:plCGm4LwR++T1P1sXsJbyFRX54CE1WRuo9PAPj6MC3Q= github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQcfA= github.com/minio/madmin-go v1.0.6/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs= github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs= -github.com/minio/madmin-go v1.0.17 h1:VMEn4nMKf0X3uNH0u+fZcn17KSwVkQGwyER/igG556E= github.com/minio/madmin-go v1.0.17/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA= +github.com/minio/madmin-go v1.0.19 h1:XAp2rpo9OwzKAHIq5+EkAt148+lIeFyeo7cgVLNCWC8= +github.com/minio/madmin-go v1.0.19/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f h1:hyFvo5hSFw2K417YvDr/vAKlgCG69uTuhZW/5LNdL0U= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f/go.mod h1:tuaonkPjVApCXkbtKENHBtsqUf7YTV33qmFrC+Pgp5g= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=