Implement netperf for "mc support perf net" (#14397)

Co-authored-by: Klaus Post <klauspost@gmail.com>
This commit is contained in:
Krishna Srinivas
2022-03-08 09:54:38 -08:00
committed by GitHub
parent 8a274169da
commit 4d0715d226
8 changed files with 470 additions and 149 deletions

View File

@@ -27,20 +27,15 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/minio/madmin-go"
b "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/randreader"
"github.com/tinylib/msgp/msgp"
)
@@ -1143,148 +1138,6 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request)
}
}
// SpeedtestResult return value of the speedtest function
type SpeedtestResult struct {
Endpoint string
Uploads uint64
Downloads uint64
Error string
}
func newRandomReader(size int) io.Reader {
return io.LimitReader(randreader.New(), int64(size))
}
// Runs the speedtest on local MinIO process.
func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration, storageClass string) (SpeedtestResult, error) {
objAPI := newObjectLayerFn()
if objAPI == nil {
return SpeedtestResult{}, errServerNotInitialized
}
var errOnce sync.Once
var retError string
var wg sync.WaitGroup
var totalBytesWritten uint64
var totalBytesRead uint64
objCountPerThread := make([]uint64, concurrent)
uploadsCtx, uploadsCancel := context.WithCancel(context.Background())
defer uploadsCancel()
go func() {
time.Sleep(duration)
uploadsCancel()
}()
objNamePrefix := "speedtest/objects/" + uuid.New().String()
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func(i int) {
defer wg.Done()
for {
hashReader, err := hash.NewReader(newRandomReader(size),
int64(size), "", "", int64(size))
if err != nil {
if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() {
retError = err.Error()
})
}
uploadsCancel()
return
}
reader := NewPutObjReader(hashReader)
objInfo, err := objAPI.PutObject(uploadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{
UserDefined: map[string]string{
xhttp.AmzStorageClass: storageClass,
},
Speedtest: true,
})
if err != nil {
objCountPerThread[i]--
if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() {
retError = err.Error()
})
}
uploadsCancel()
return
}
atomic.AddUint64(&totalBytesWritten, uint64(objInfo.Size))
objCountPerThread[i]++
}
}(i)
}
wg.Wait()
// We already saw write failures, no need to proceed into read's
if retError != "" {
return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil
}
downloadsCtx, downloadsCancel := context.WithCancel(context.Background())
defer downloadsCancel()
go func() {
time.Sleep(duration)
downloadsCancel()
}()
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func(i int) {
defer wg.Done()
var j uint64
if objCountPerThread[i] == 0 {
return
}
for {
if objCountPerThread[i] == j {
j = 0
}
r, err := objAPI.GetObjectNInfo(downloadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{})
if err != nil {
if isErrObjectNotFound(err) {
continue
}
if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() {
retError = err.Error()
})
}
downloadsCancel()
return
}
n, err := io.Copy(ioutil.Discard, r)
r.Close()
if err == nil {
// Only capture success criteria - do not
// have to capture failed reads, truncated
// reads etc.
atomic.AddUint64(&totalBytesRead, uint64(n))
}
if err != nil {
if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() {
retError = err.Error()
})
}
downloadsCancel()
return
}
j++
}
}(i)
}
wg.Wait()
return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil
}
func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
@@ -1384,6 +1237,50 @@ func (s *peerRESTServer) DriveSpeedTestHandler(w http.ResponseWriter, r *http.Re
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result))
}
// DevNull - everything goes to io.Discard
func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
globalNetPerfRX.Connect()
defer globalNetPerfRX.Disconnect()
connectTime := time.Now()
ctx := newContext(r, w, "DevNull")
for {
n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte)
atomic.AddUint64(&globalNetPerfRX.RX, uint64(n))
if err != nil && err != io.EOF {
// If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec)
// would mean the network is not stable. Logging here will help in debugging network issues.
if time.Since(connectTime) < (globalNetPerfMinDuration - time.Second) {
logger.LogIf(ctx, err)
}
}
if err != nil {
break
}
}
}
// Netperf - perform netperf
func (s *peerRESTServer) Netperf(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
durationStr := r.Form.Get(peerRESTDuration)
duration, err := time.ParseDuration(durationStr)
if err != nil || duration.Seconds() == 0 {
duration = time.Second * 10
}
result := netperf(r.Context(), duration.Round(time.Second))
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result))
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
@@ -1431,6 +1328,8 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveSpeedTest).HandlerFunc(httpTraceHdrs(server.DriveSpeedTestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetperf).HandlerFunc(httpTraceHdrs(server.Netperf))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(httpTraceHdrs(server.DevNull))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(httpTraceHdrs(server.GetLastDayTierStatsHandler))