mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
69c0e18685
In a perf test, one node will run speed test with all nodes. If there is an error with a peer node, the peer node name is not included in the error hence confusing the user. This commit will add the peer endpoint string to the netperf error.
440 lines
11 KiB
Go
440 lines
11 KiB
Go
// Copyright (c) 2022 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/gob"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/url"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio-go/v7"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/pkg/v2/randreader"
|
|
)
|
|
|
|
// SpeedTestResult return value of the speedtest function
|
|
type SpeedTestResult struct {
|
|
Endpoint string
|
|
Uploads uint64
|
|
Downloads uint64
|
|
UploadTimes madmin.TimeDurations
|
|
DownloadTimes madmin.TimeDurations
|
|
DownloadTTFB madmin.TimeDurations
|
|
Error string
|
|
}
|
|
|
|
func newRandomReader(size int) io.Reader {
|
|
return io.LimitReader(randreader.New(), int64(size))
|
|
}
|
|
|
|
type firstByteRecorder struct {
|
|
t *time.Time
|
|
r io.Reader
|
|
}
|
|
|
|
func (f *firstByteRecorder) Read(p []byte) (n int, err error) {
|
|
if f.t != nil || len(p) == 0 {
|
|
return f.r.Read(p)
|
|
}
|
|
// Read a single byte.
|
|
n, err = f.r.Read(p[:1])
|
|
if n > 0 {
|
|
t := time.Now()
|
|
f.t = &t
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Runs the speedtest on local MinIO process.
|
|
func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, error) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return SpeedTestResult{}, errServerNotInitialized
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
var errOnce sync.Once
|
|
var retError string
|
|
var totalBytesWritten uint64
|
|
var totalBytesRead uint64
|
|
|
|
objCountPerThread := make([]uint64, opts.concurrency)
|
|
|
|
uploadsCtx, uploadsCancel := context.WithTimeout(ctx, opts.duration)
|
|
defer uploadsCancel()
|
|
|
|
objNamePrefix := pathJoin(speedTest, mustGetUUID())
|
|
|
|
userMetadata := make(map[string]string)
|
|
userMetadata[globalObjectPerfUserMetadata] = "true" // Bypass S3 API freeze
|
|
popts := minio.PutObjectOptions{
|
|
UserMetadata: userMetadata,
|
|
DisableContentSha256: true,
|
|
DisableMultipart: true,
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
var uploadTimes madmin.TimeDurations
|
|
wg.Add(opts.concurrency)
|
|
for i := 0; i < opts.concurrency; i++ {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
for {
|
|
t := time.Now()
|
|
reader := newRandomReader(opts.objectSize)
|
|
tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, objCountPerThread[i]))
|
|
info, err := globalMinioClient.PutObject(uploadsCtx, opts.bucketName, tmpObjName, reader, int64(opts.objectSize), popts)
|
|
if err != nil {
|
|
if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
|
|
errOnce.Do(func() {
|
|
retError = err.Error()
|
|
})
|
|
}
|
|
uploadsCancel()
|
|
return
|
|
}
|
|
response := time.Since(t)
|
|
atomic.AddUint64(&totalBytesWritten, uint64(info.Size))
|
|
objCountPerThread[i]++
|
|
mu.Lock()
|
|
uploadTimes = append(uploadTimes, response)
|
|
mu.Unlock()
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
// We already saw write failures, no need to proceed into read's
|
|
if retError != "" {
|
|
return SpeedTestResult{
|
|
Uploads: totalBytesWritten,
|
|
Downloads: totalBytesRead,
|
|
UploadTimes: uploadTimes,
|
|
Error: retError,
|
|
}, nil
|
|
}
|
|
|
|
downloadsCtx, downloadsCancel := context.WithTimeout(ctx, opts.duration)
|
|
defer downloadsCancel()
|
|
|
|
gopts := minio.GetObjectOptions{}
|
|
gopts.Set(globalObjectPerfUserMetadata, "true") // Bypass S3 API freeze
|
|
|
|
var downloadTimes madmin.TimeDurations
|
|
var downloadTTFB madmin.TimeDurations
|
|
wg.Add(opts.concurrency)
|
|
for i := 0; i < opts.concurrency; i++ {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
var j uint64
|
|
if objCountPerThread[i] == 0 {
|
|
return
|
|
}
|
|
for {
|
|
if objCountPerThread[i] == j {
|
|
j = 0
|
|
}
|
|
tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, j))
|
|
t := time.Now()
|
|
r, err := globalMinioClient.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts)
|
|
if err != nil {
|
|
errResp, ok := err.(minio.ErrorResponse)
|
|
if ok && errResp.StatusCode == http.StatusNotFound {
|
|
continue
|
|
}
|
|
if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
|
|
errOnce.Do(func() {
|
|
retError = err.Error()
|
|
})
|
|
}
|
|
downloadsCancel()
|
|
return
|
|
}
|
|
fbr := firstByteRecorder{
|
|
r: r,
|
|
}
|
|
n, err := io.Copy(io.Discard, &fbr)
|
|
r.Close()
|
|
if err == nil {
|
|
response := time.Since(t)
|
|
ttfb := time.Since(*fbr.t)
|
|
// Only capture success criteria - do not
|
|
// have to capture failed reads, truncated
|
|
// reads etc.
|
|
atomic.AddUint64(&totalBytesRead, uint64(n))
|
|
mu.Lock()
|
|
downloadTimes = append(downloadTimes, response)
|
|
downloadTTFB = append(downloadTTFB, ttfb)
|
|
mu.Unlock()
|
|
}
|
|
if err != nil {
|
|
if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
|
|
errOnce.Do(func() {
|
|
retError = err.Error()
|
|
})
|
|
}
|
|
downloadsCancel()
|
|
return
|
|
}
|
|
j++
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
return SpeedTestResult{
|
|
Uploads: totalBytesWritten,
|
|
Downloads: totalBytesRead,
|
|
UploadTimes: uploadTimes,
|
|
DownloadTimes: downloadTimes,
|
|
DownloadTTFB: downloadTTFB,
|
|
Error: retError,
|
|
}, nil
|
|
}
|
|
|
|
// To collect RX stats during "mc support perf net"
|
|
// RXSample holds the RX bytes for the duration between
|
|
// the last peer to connect and the first peer to disconnect.
|
|
// This is to improve the RX throughput accuracy.
|
|
type netPerfRX struct {
|
|
RX uint64 // RX bytes
|
|
lastToConnect time.Time // time at which last peer to connect to us
|
|
firstToDisconnect time.Time // time at which the first peer disconnects from us
|
|
RXSample uint64 // RX bytes between lastToConnect and firstToDisconnect
|
|
activeConnections uint64
|
|
sync.RWMutex
|
|
}
|
|
|
|
func (n *netPerfRX) Connect() {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.activeConnections++
|
|
atomic.StoreUint64(&n.RX, 0)
|
|
n.lastToConnect = time.Now()
|
|
}
|
|
|
|
func (n *netPerfRX) Disconnect() {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.activeConnections--
|
|
if n.firstToDisconnect.IsZero() {
|
|
n.RXSample = atomic.LoadUint64(&n.RX)
|
|
n.firstToDisconnect = time.Now()
|
|
}
|
|
}
|
|
|
|
func (n *netPerfRX) ActiveConnections() uint64 {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.activeConnections
|
|
}
|
|
|
|
func (n *netPerfRX) Reset() {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.RX = 0
|
|
n.RXSample = 0
|
|
n.lastToConnect = time.Time{}
|
|
n.firstToDisconnect = time.Time{}
|
|
}
|
|
|
|
// Reader to read random data.
|
|
type netperfReader struct {
|
|
n uint64
|
|
eof chan struct{}
|
|
buf []byte
|
|
}
|
|
|
|
func (m *netperfReader) Read(b []byte) (int, error) {
|
|
select {
|
|
case <-m.eof:
|
|
return 0, io.EOF
|
|
default:
|
|
}
|
|
n := copy(b, m.buf)
|
|
atomic.AddUint64(&m.n, uint64(n))
|
|
return n, nil
|
|
}
|
|
|
|
func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResult {
|
|
r := &netperfReader{eof: make(chan struct{})}
|
|
r.buf = make([]byte, 128*humanize.KiByte)
|
|
rand.Read(r.buf)
|
|
|
|
connectionsPerPeer := 16
|
|
|
|
if len(globalNotificationSys.peerClients) > 16 {
|
|
// For a large cluster it's enough to have 1 connection per peer to saturate the network.
|
|
connectionsPerPeer = 1
|
|
}
|
|
|
|
errStr := ""
|
|
var wg sync.WaitGroup
|
|
for index := range globalNotificationSys.peerClients {
|
|
if globalNotificationSys.peerClients[index] == nil {
|
|
continue
|
|
}
|
|
go func(index int) {
|
|
for i := 0; i < connectionsPerPeer; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := globalNotificationSys.peerClients[index].DevNull(ctx, r)
|
|
if err != nil {
|
|
errStr = fmt.Sprintf("error with %s: %s", globalNotificationSys.peerClients[index].String(), err.Error())
|
|
}
|
|
}()
|
|
}
|
|
}(index)
|
|
}
|
|
|
|
time.Sleep(duration)
|
|
close(r.eof)
|
|
wg.Wait()
|
|
for {
|
|
if globalNetPerfRX.ActiveConnections() == 0 {
|
|
break
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
rx := float64(globalNetPerfRX.RXSample)
|
|
delta := globalNetPerfRX.firstToDisconnect.Sub(globalNetPerfRX.lastToConnect)
|
|
if delta < 0 {
|
|
rx = 0
|
|
errStr = "network disconnection issues detected"
|
|
}
|
|
|
|
globalNetPerfRX.Reset()
|
|
return madmin.NetperfNodeResult{Endpoint: "", TX: r.n / uint64(duration.Seconds()), RX: uint64(rx / delta.Seconds()), Error: errStr}
|
|
}
|
|
|
|
func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerfNodeResult {
|
|
r := &netperfReader{eof: make(chan struct{})}
|
|
r.buf = make([]byte, 128*humanize.KiByte)
|
|
rand.Read(r.buf)
|
|
|
|
clusterInfos, err := globalSiteReplicationSys.GetClusterInfo(ctx)
|
|
if err != nil {
|
|
return madmin.SiteNetPerfNodeResult{Error: err.Error()}
|
|
}
|
|
|
|
// Scale the number of connections from 32 -> 4 from small to large clusters.
|
|
connectionsPerPeer := 3 + (29+len(clusterInfos.Sites)-1)/len(clusterInfos.Sites)
|
|
|
|
errStr := ""
|
|
var wg sync.WaitGroup
|
|
|
|
for _, info := range clusterInfos.Sites {
|
|
// skip self
|
|
if globalDeploymentID == info.DeploymentID {
|
|
continue
|
|
}
|
|
info := info
|
|
wg.Add(connectionsPerPeer)
|
|
for i := 0; i < connectionsPerPeer; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
ctx, cancel := context.WithTimeout(ctx, duration+10*time.Second)
|
|
defer cancel()
|
|
perfNetRequest(
|
|
ctx,
|
|
info.DeploymentID,
|
|
adminPathPrefix+adminAPIVersionPrefix+adminAPISiteReplicationDevNull,
|
|
r,
|
|
)
|
|
}()
|
|
}
|
|
}
|
|
|
|
time.Sleep(duration)
|
|
close(r.eof)
|
|
wg.Wait()
|
|
for {
|
|
if globalSiteNetPerfRX.ActiveConnections() == 0 || contextCanceled(ctx) {
|
|
break
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
rx := float64(globalSiteNetPerfRX.RXSample)
|
|
delta := globalSiteNetPerfRX.firstToDisconnect.Sub(globalSiteNetPerfRX.lastToConnect)
|
|
// If the first disconnected before the last connected, we likely had a network issue.
|
|
if delta <= 0 {
|
|
rx = 0
|
|
errStr = "detected network disconnections, possibly an unstable network"
|
|
}
|
|
|
|
globalSiteNetPerfRX.Reset()
|
|
return madmin.SiteNetPerfNodeResult{
|
|
Endpoint: "",
|
|
TX: r.n,
|
|
TXTotalDuration: duration,
|
|
RX: uint64(rx),
|
|
RXTotalDuration: delta,
|
|
Error: errStr,
|
|
TotalConn: uint64(connectionsPerPeer),
|
|
}
|
|
}
|
|
|
|
// perfNetRequest - reader for http.request.body
|
|
func perfNetRequest(ctx context.Context, deploymentID, reqPath string, reader io.Reader) (result madmin.SiteNetPerfNodeResult) {
|
|
result = madmin.SiteNetPerfNodeResult{}
|
|
cli, err := globalSiteReplicationSys.getAdminClient(ctx, deploymentID)
|
|
if err != nil {
|
|
result.Error = err.Error()
|
|
return
|
|
}
|
|
rp := cli.GetEndpointURL()
|
|
reqURL := &url.URL{
|
|
Scheme: rp.Scheme,
|
|
Host: rp.Host,
|
|
Path: reqPath,
|
|
}
|
|
result.Endpoint = rp.String()
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), reader)
|
|
if err != nil {
|
|
result.Error = err.Error()
|
|
return
|
|
}
|
|
client := &http.Client{
|
|
Transport: globalRemoteTargetTransport,
|
|
}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
result.Error = err.Error()
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(resp.Body)
|
|
err = gob.NewDecoder(resp.Body).Decode(&result)
|
|
// endpoint have been overwritten
|
|
result.Endpoint = rp.String()
|
|
if err != nil {
|
|
result.Error = err.Error()
|
|
}
|
|
return
|
|
}
|