mirror of
https://github.com/minio/minio.git
synced 2025-11-21 02:09:08 -05:00
Implement oboard diagnostics admin API (#9024)
- Implement a graph algorithm to test network bandwidth from every node to every other node - Saturate any network bandwidth adaptively, accounting for slow and fast network capacity - Implement parallel drive OBD tests - Implement a paging mechanism for OBD test to provide periodic updates to client - Implement Sys, Process, Host, Mem OBD Infos
This commit is contained in:
@@ -22,9 +22,12 @@ import (
|
||||
"crypto/tls"
|
||||
"encoding/gob"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -41,7 +44,13 @@ import (
|
||||
trace "github.com/minio/minio/pkg/trace"
|
||||
)
|
||||
|
||||
// client to talk to peer NEndpoints.
|
||||
const (
|
||||
kiB int64 = 1 << 10
|
||||
miB int64 = kiB << 10
|
||||
giB int64 = miB << 10
|
||||
)
|
||||
|
||||
// client to talk to peer Nodes.
|
||||
type peerRESTClient struct {
|
||||
host *xnet.Host
|
||||
restClient *rest.Client
|
||||
@@ -190,6 +199,321 @@ func (client *peerRESTClient) NetworkInfo() (info madmin.ServerNetworkHardwareIn
|
||||
return info, err
|
||||
}
|
||||
|
||||
type networkOverloadedErr struct{}
|
||||
|
||||
var networkOverloaded networkOverloadedErr
|
||||
|
||||
func (n networkOverloadedErr) Error() string {
|
||||
return "network overloaded"
|
||||
}
|
||||
|
||||
type progressReader struct {
|
||||
r io.Reader
|
||||
progressChan chan int64
|
||||
}
|
||||
|
||||
func (p *progressReader) Read(b []byte) (int, error) {
|
||||
n, err := p.r.Read(b)
|
||||
if err != nil && err != io.EOF {
|
||||
return n, err
|
||||
}
|
||||
p.progressChan <- int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.NetOBDInfo, err error) {
|
||||
latencies := []float64{}
|
||||
throughputs := []float64{}
|
||||
|
||||
buf := make([]byte, dataSize)
|
||||
|
||||
buflimiter := make(chan struct{}, threadCount)
|
||||
errChan := make(chan error, threadCount)
|
||||
|
||||
totalTransferred := int64(0)
|
||||
transferChan := make(chan int64, threadCount)
|
||||
go func() {
|
||||
for v := range transferChan {
|
||||
atomic.AddInt64(&totalTransferred, v)
|
||||
}
|
||||
}()
|
||||
|
||||
// ensure enough samples to obtain normal distribution
|
||||
maxSamples := int(10 * threadCount)
|
||||
|
||||
innerCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
slowSamples := int32(0)
|
||||
maxSlowSamples := int32(maxSamples / 20)
|
||||
slowSample := func() {
|
||||
if slowSamples > maxSlowSamples { // 5% of total
|
||||
return
|
||||
}
|
||||
if atomic.AddInt32(&slowSamples, 1) >= maxSlowSamples {
|
||||
errChan <- networkOverloaded
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
finish := func() {
|
||||
<-buflimiter
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
for i := 0; i < maxSamples; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return info, ctx.Err()
|
||||
case err = <-errChan:
|
||||
case buflimiter <- struct{}{}:
|
||||
wg.Add(1)
|
||||
|
||||
if innerCtx.Err() != nil {
|
||||
finish()
|
||||
continue
|
||||
}
|
||||
|
||||
go func(i int) {
|
||||
bufReader := bytes.NewReader(buf)
|
||||
bufReadCloser := ioutil.NopCloser(&progressReader{
|
||||
r: bufReader,
|
||||
progressChan: transferChan,
|
||||
})
|
||||
start := time.Now()
|
||||
before := atomic.LoadInt64(&totalTransferred)
|
||||
|
||||
ctx, cancel := context.WithTimeout(innerCtx, 10*time.Second)
|
||||
defer cancel()
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodNetOBDInfo, nil, bufReadCloser, dataSize)
|
||||
if err != nil {
|
||||
|
||||
if netErr, ok := err.(*rest.NetworkError); ok {
|
||||
if urlErr, ok := netErr.Err.(*url.Error); ok {
|
||||
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
|
||||
slowSample()
|
||||
finish()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errChan <- err
|
||||
finish()
|
||||
return
|
||||
}
|
||||
http.DrainBody(respBody)
|
||||
|
||||
after := atomic.LoadInt64(&totalTransferred)
|
||||
finish()
|
||||
end := time.Now()
|
||||
|
||||
latency := float64(end.Sub(start).Seconds())
|
||||
|
||||
if latency > maxLatencyForSizeThreads(dataSize, threadCount) {
|
||||
slowSample()
|
||||
}
|
||||
|
||||
/* Throughput = (total data transferred across all threads / time taken) */
|
||||
throughput := float64(float64((after - before)) / latency)
|
||||
|
||||
latencies = append(latencies, latency)
|
||||
throughputs = append(throughputs, throughput)
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
latency, throughput, err := xnet.ComputeOBDStats(latencies, throughputs)
|
||||
info = madmin.NetOBDInfo{
|
||||
Latency: latency,
|
||||
Throughput: throughput,
|
||||
}
|
||||
return info, err
|
||||
|
||||
}
|
||||
|
||||
func maxLatencyForSizeThreads(size int64, threadCount uint) float64 {
|
||||
Gbit100 := 12.5 * float64(giB)
|
||||
Gbit40 := 5.00 * float64(giB)
|
||||
Gbit25 := 3.25 * float64(giB)
|
||||
Gbit10 := 1.25 * float64(giB)
|
||||
// Gbit1 := 0.25 * float64(giB)
|
||||
|
||||
// Given the current defaults, each combination of size/thread
|
||||
// is supposed to fully saturate the intended pipe when all threads are active
|
||||
// i.e. if the test is performed in a perfectly controlled environment, i.e. without
|
||||
// CPU scheduling latencies and/or network jitters, then all threads working
|
||||
// simultaneously should result in each of them completing in 1s
|
||||
//
|
||||
// In reality, I've assumed a normal distribution of latency with expected mean of 1s and min of 0s
|
||||
// Then, 95% of threads should complete within 2 seconds (2 std. deviations from the mean). The 2s comes
|
||||
// from fitting the normal curve such that the mean is 1.
|
||||
//
|
||||
// i.e. we expect that no more than 5% of threads to take longer than 2s to push the data.
|
||||
//
|
||||
// throughput | max latency
|
||||
// 100 Gbit | 2s
|
||||
// 40 Gbit | 2s
|
||||
// 25 Gbit | 2s
|
||||
// 10 Gbit | 2s
|
||||
// 1 Gbit | inf
|
||||
|
||||
throughput := float64(int64(size) * int64(threadCount))
|
||||
if throughput >= Gbit100 {
|
||||
return 2.0
|
||||
} else if throughput >= Gbit40 {
|
||||
return 2.0
|
||||
} else if throughput >= Gbit25 {
|
||||
return 2.0
|
||||
} else if throughput >= Gbit10 {
|
||||
return 2.0
|
||||
}
|
||||
return math.MaxFloat64
|
||||
}
|
||||
|
||||
// NetOBDInfo - fetch Net OBD information for a remote node.
|
||||
func (client *peerRESTClient) NetOBDInfo(ctx context.Context) (info madmin.NetOBDInfo, err error) {
|
||||
|
||||
// 100 Gbit -> 256 MiB * 50 threads
|
||||
// 40 Gbit -> 256 MiB * 20 threads
|
||||
// 25 Gbit -> 128 MiB * 25 threads
|
||||
// 10 Gbit -> 128 MiB * 10 threads
|
||||
// 1 Gbit -> 64 MiB * 2 threads
|
||||
|
||||
type step struct {
|
||||
size int64
|
||||
threads uint
|
||||
}
|
||||
steps := []step{
|
||||
{ // 100 Gbit
|
||||
size: 256 * miB,
|
||||
threads: 50,
|
||||
},
|
||||
{ // 40 Gbit
|
||||
size: 256 * miB,
|
||||
threads: 20,
|
||||
},
|
||||
{ // 25 Gbit
|
||||
size: 128 * miB,
|
||||
threads: 25,
|
||||
},
|
||||
{ // 10 Gbit
|
||||
size: 128 * miB,
|
||||
threads: 10,
|
||||
},
|
||||
{ // 1 Gbit
|
||||
size: 64 * miB,
|
||||
threads: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for i := range steps {
|
||||
size := steps[i].size
|
||||
threads := steps[i].threads
|
||||
|
||||
if info, err = client.doNetOBDTest(ctx, size, threads); err != nil {
|
||||
if err == networkOverloaded {
|
||||
continue
|
||||
}
|
||||
|
||||
if netErr, ok := err.(*rest.NetworkError); ok {
|
||||
if urlErr, ok := netErr.Err.(*url.Error); ok {
|
||||
if urlErr.Err.Error() == context.Canceled.Error() {
|
||||
continue
|
||||
}
|
||||
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return info, err
|
||||
}
|
||||
return info, err
|
||||
}
|
||||
|
||||
// DispatchNetOBDInfo - dispatch other nodes to run Net OBD.
|
||||
func (client *peerRESTClient) DispatchNetOBDInfo(ctx context.Context) (info madmin.ServerNetOBDInfo, err error) {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodDispatchNetOBDInfo, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&info)
|
||||
return
|
||||
}
|
||||
|
||||
// DriveOBDInfo - fetch Drive OBD information for a remote node.
|
||||
func (client *peerRESTClient) DriveOBDInfo(ctx context.Context) (info madmin.ServerDrivesOBDInfo, err error) {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodDriveOBDInfo, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&info)
|
||||
return info, err
|
||||
}
|
||||
|
||||
// CPUOBDInfo - fetch CPU OBD information for a remote node.
|
||||
func (client *peerRESTClient) CPUOBDInfo(ctx context.Context) (info madmin.ServerCPUOBDInfo, err error) {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodCPUOBDInfo, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&info)
|
||||
return info, err
|
||||
}
|
||||
|
||||
// DiskHwOBDInfo - fetch Disk HW OBD information for a remote node.
|
||||
func (client *peerRESTClient) DiskHwOBDInfo(ctx context.Context) (info madmin.ServerDiskHwOBDInfo, err error) {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodDiskHwOBDInfo, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&info)
|
||||
return info, err
|
||||
}
|
||||
|
||||
// OsOBDInfo - fetch OsInfo OBD information for a remote node.
|
||||
func (client *peerRESTClient) OsOBDInfo(ctx context.Context) (info madmin.ServerOsOBDInfo, err error) {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodOsInfoOBDInfo, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&info)
|
||||
return info, err
|
||||
}
|
||||
|
||||
// MemOBDInfo - fetch MemInfo OBD information for a remote node.
|
||||
func (client *peerRESTClient) MemOBDInfo(ctx context.Context) (info madmin.ServerMemOBDInfo, err error) {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodMemOBDInfo, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&info)
|
||||
return info, err
|
||||
}
|
||||
|
||||
// ProcOBDInfo - fetch ProcInfo OBD information for a remote node.
|
||||
func (client *peerRESTClient) ProcOBDInfo(ctx context.Context) (info madmin.ServerProcOBDInfo, err error) {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodProcOBDInfo, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&info)
|
||||
return info, err
|
||||
}
|
||||
|
||||
// DrivePerfInfo - fetch Drive performance information for a remote node.
|
||||
func (client *peerRESTClient) DrivePerfInfo(size int64) (info madmin.ServerDrivesPerfInfo, err error) {
|
||||
params := make(url.Values)
|
||||
|
||||
Reference in New Issue
Block a user