mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
8f7c739328
Capture average, p50, p99, p999 response times and ttfb values. These are needed for latency measurements and overall understanding of our speedtest results.
342 lines
8.4 KiB
Go
342 lines
8.4 KiB
Go
// Copyright (c) 2022 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/minio/madmin-go"
|
|
"github.com/minio/minio-go/v7"
|
|
"github.com/minio/pkg/randreader"
|
|
)
|
|
|
|
// SpeedTestResult return value of the speedtest function
|
|
type SpeedTestResult struct {
|
|
Endpoint string
|
|
Uploads uint64
|
|
Downloads uint64
|
|
UploadTimes madmin.TimeDurations
|
|
DownloadTimes madmin.TimeDurations
|
|
DownloadTTFB madmin.TimeDurations
|
|
Error string
|
|
}
|
|
|
|
func newRandomReader(size int) io.Reader {
|
|
return io.LimitReader(randreader.New(), int64(size))
|
|
}
|
|
|
|
type firstByteRecorder struct {
|
|
t *time.Time
|
|
r io.Reader
|
|
}
|
|
|
|
func (f *firstByteRecorder) Read(p []byte) (n int, err error) {
|
|
if f.t != nil || len(p) == 0 {
|
|
return f.r.Read(p)
|
|
}
|
|
// Read a single byte.
|
|
n, err = f.r.Read(p[:1])
|
|
if n > 0 {
|
|
t := time.Now()
|
|
f.t = &t
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Runs the speedtest on local MinIO process.
|
|
func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, error) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return SpeedTestResult{}, errServerNotInitialized
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
var errOnce sync.Once
|
|
var retError string
|
|
var totalBytesWritten uint64
|
|
var totalBytesRead uint64
|
|
|
|
objCountPerThread := make([]uint64, opts.concurrency)
|
|
|
|
uploadsCtx, uploadsCancel := context.WithCancel(context.Background())
|
|
defer uploadsCancel()
|
|
|
|
go func() {
|
|
time.Sleep(opts.duration)
|
|
uploadsCancel()
|
|
}()
|
|
|
|
objNamePrefix := pathJoin(speedTest, mustGetUUID())
|
|
|
|
userMetadata := make(map[string]string)
|
|
userMetadata[globalObjectPerfUserMetadata] = "true" // Bypass S3 API freeze
|
|
popts := minio.PutObjectOptions{
|
|
UserMetadata: userMetadata,
|
|
DisableContentSha256: true,
|
|
DisableMultipart: true,
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
var uploadTimes madmin.TimeDurations
|
|
wg.Add(opts.concurrency)
|
|
for i := 0; i < opts.concurrency; i++ {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
for {
|
|
t := time.Now()
|
|
reader := newRandomReader(opts.objectSize)
|
|
tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, objCountPerThread[i]))
|
|
info, err := globalMinioClient.PutObject(uploadsCtx, opts.bucketName, tmpObjName, reader, int64(opts.objectSize), popts)
|
|
if err != nil {
|
|
if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
|
|
errOnce.Do(func() {
|
|
retError = err.Error()
|
|
})
|
|
}
|
|
uploadsCancel()
|
|
return
|
|
}
|
|
response := time.Since(t)
|
|
atomic.AddUint64(&totalBytesWritten, uint64(info.Size))
|
|
objCountPerThread[i]++
|
|
mu.Lock()
|
|
uploadTimes = append(uploadTimes, response)
|
|
mu.Unlock()
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
// We already saw write failures, no need to proceed into read's
|
|
if retError != "" {
|
|
return SpeedTestResult{
|
|
Uploads: totalBytesWritten,
|
|
Downloads: totalBytesRead,
|
|
UploadTimes: uploadTimes,
|
|
Error: retError,
|
|
}, nil
|
|
}
|
|
|
|
downloadsCtx, downloadsCancel := context.WithCancel(context.Background())
|
|
defer downloadsCancel()
|
|
go func() {
|
|
time.Sleep(opts.duration)
|
|
downloadsCancel()
|
|
}()
|
|
|
|
gopts := minio.GetObjectOptions{}
|
|
gopts.Set(globalObjectPerfUserMetadata, "true") // Bypass S3 API freeze
|
|
|
|
var downloadTimes madmin.TimeDurations
|
|
var downloadTTFB madmin.TimeDurations
|
|
wg.Add(opts.concurrency)
|
|
for i := 0; i < opts.concurrency; i++ {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
var j uint64
|
|
if objCountPerThread[i] == 0 {
|
|
return
|
|
}
|
|
for {
|
|
if objCountPerThread[i] == j {
|
|
j = 0
|
|
}
|
|
tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, j))
|
|
t := time.Now()
|
|
r, err := globalMinioClient.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts)
|
|
if err != nil {
|
|
errResp, ok := err.(minio.ErrorResponse)
|
|
if ok && errResp.StatusCode == http.StatusNotFound {
|
|
continue
|
|
}
|
|
if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
|
|
errOnce.Do(func() {
|
|
retError = err.Error()
|
|
})
|
|
}
|
|
downloadsCancel()
|
|
return
|
|
}
|
|
fbr := firstByteRecorder{
|
|
r: r,
|
|
}
|
|
n, err := io.Copy(ioutil.Discard, &fbr)
|
|
r.Close()
|
|
if err == nil {
|
|
response := time.Since(t)
|
|
ttfb := time.Since(*fbr.t)
|
|
// Only capture success criteria - do not
|
|
// have to capture failed reads, truncated
|
|
// reads etc.
|
|
atomic.AddUint64(&totalBytesRead, uint64(n))
|
|
mu.Lock()
|
|
downloadTimes = append(downloadTimes, response)
|
|
downloadTTFB = append(downloadTTFB, ttfb)
|
|
mu.Unlock()
|
|
}
|
|
if err != nil {
|
|
if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
|
|
errOnce.Do(func() {
|
|
retError = err.Error()
|
|
})
|
|
}
|
|
downloadsCancel()
|
|
return
|
|
}
|
|
j++
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
return SpeedTestResult{
|
|
Uploads: totalBytesWritten,
|
|
Downloads: totalBytesRead,
|
|
UploadTimes: uploadTimes,
|
|
DownloadTimes: downloadTimes,
|
|
DownloadTTFB: downloadTTFB,
|
|
Error: retError,
|
|
}, nil
|
|
}
|
|
|
|
// To collect RX stats during "mc support perf net"
|
|
// RXSample holds the RX bytes for the duration between
|
|
// the last peer to connect and the first peer to disconnect.
|
|
// This is to improve the RX throughput accuracy.
|
|
type netPerfRX struct {
|
|
RX uint64 // RX bytes
|
|
lastToConnect time.Time // time at which last peer to connect to us
|
|
firstToDisconnect time.Time // time at which the first peer disconnects from us
|
|
RXSample uint64 // RX bytes between lastToConnect and firstToDisconnect
|
|
activeConnections uint64
|
|
sync.RWMutex
|
|
}
|
|
|
|
func (n *netPerfRX) Connect() {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.activeConnections++
|
|
atomic.StoreUint64(&globalNetPerfRX.RX, 0)
|
|
n.lastToConnect = time.Now()
|
|
}
|
|
|
|
func (n *netPerfRX) Disconnect() {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.activeConnections--
|
|
if n.firstToDisconnect.IsZero() {
|
|
n.RXSample = atomic.LoadUint64(&n.RX)
|
|
n.firstToDisconnect = time.Now()
|
|
}
|
|
}
|
|
|
|
func (n *netPerfRX) ActiveConnections() uint64 {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.activeConnections
|
|
}
|
|
|
|
func (n *netPerfRX) Reset() {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
n.RX = 0
|
|
n.RXSample = 0
|
|
n.lastToConnect = time.Time{}
|
|
n.firstToDisconnect = time.Time{}
|
|
}
|
|
|
|
// Reader to read random data.
|
|
type netperfReader struct {
|
|
n uint64
|
|
eof chan struct{}
|
|
buf []byte
|
|
}
|
|
|
|
func (m *netperfReader) Read(b []byte) (int, error) {
|
|
select {
|
|
case <-m.eof:
|
|
return 0, io.EOF
|
|
default:
|
|
}
|
|
n := copy(b, m.buf)
|
|
atomic.AddUint64(&m.n, uint64(n))
|
|
return n, nil
|
|
}
|
|
|
|
func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResult {
|
|
r := &netperfReader{eof: make(chan struct{})}
|
|
r.buf = make([]byte, 128*humanize.KiByte)
|
|
rand.Read(r.buf)
|
|
|
|
connectionsPerPeer := 16
|
|
|
|
if len(globalNotificationSys.peerClients) > 16 {
|
|
// For a large cluster it's enough to have 1 connection per peer to saturate the network.
|
|
connectionsPerPeer = 1
|
|
}
|
|
|
|
errStr := ""
|
|
var wg sync.WaitGroup
|
|
for index := range globalNotificationSys.peerClients {
|
|
if globalNotificationSys.peerClients[index] == nil {
|
|
continue
|
|
}
|
|
go func(index int) {
|
|
for i := 0; i < connectionsPerPeer; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := globalNotificationSys.peerClients[index].DevNull(ctx, r)
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
}()
|
|
}
|
|
}(index)
|
|
}
|
|
|
|
time.Sleep(duration)
|
|
close(r.eof)
|
|
wg.Wait()
|
|
for {
|
|
if globalNetPerfRX.ActiveConnections() == 0 {
|
|
break
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
rx := float64(globalNetPerfRX.RXSample)
|
|
delta := globalNetPerfRX.firstToDisconnect.Sub(globalNetPerfRX.lastToConnect)
|
|
if delta < 0 {
|
|
rx = 0
|
|
errStr = "network disconnection issues detected"
|
|
}
|
|
|
|
globalNetPerfRX.Reset()
|
|
return madmin.NetperfNodeResult{Endpoint: "", TX: r.n / uint64(duration.Seconds()), RX: uint64(rx / delta.Seconds()), Error: errStr}
|
|
}
|