recognize slow networks to step down faster during netperf (#13473)

This commit is contained in:
Sidhartha Mani 2021-10-20 03:22:07 -07:00 committed by GitHub
parent d7fd396b7c
commit c57ff2640e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -137,16 +137,19 @@ func (client *peerRESTClient) doNetTest(ctx context.Context, dataSize int64, thr
// ensure enough samples to obtain normal distribution // ensure enough samples to obtain normal distribution
maxSamples := int(10 * threadCount) maxSamples := int(10 * threadCount)
if maxSamples > 50 {
maxSamples = 50
}
innerCtx, cancel := context.WithCancel(ctx) innerCtx, cancel := context.WithCancel(ctx)
slowSamples := int32(0) slowSamples := int32(0)
maxSlowSamples := int32(maxSamples / 20) maxSlowSamples := int32(maxSamples/20) + 1 // 5% of total
slowSample := func() { slowSample := func() {
if slowSamples > maxSlowSamples { // 5% of total if slowSamples > maxSlowSamples {
return return
} }
if atomic.AddInt32(&slowSamples, 1) >= maxSlowSamples { if atomic.AddInt32(&slowSamples, 1) > maxSlowSamples {
errChan <- networkOverloaded errChan <- networkOverloaded
cancel() cancel()
} }
@ -159,11 +162,19 @@ func (client *peerRESTClient) doNetTest(ctx context.Context, dataSize int64, thr
} }
for i := 0; i < maxSamples; i++ { for i := 0; i < maxSamples; i++ {
if slowSamples > maxSlowSamples {
break
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
cancel()
return info, ctx.Err() return info, ctx.Err()
case err = <-errChan: case err = <-errChan:
case buflimiter <- struct{}{}: case buflimiter <- struct{}{}:
if slowSamples > maxSlowSamples {
break
}
wg.Add(1) wg.Add(1)
if innerCtx.Err() != nil { if innerCtx.Err() != nil {
@ -175,7 +186,7 @@ func (client *peerRESTClient) doNetTest(ctx context.Context, dataSize int64, thr
start := time.Now() start := time.Now()
before := atomic.LoadInt64(&totalTransferred) before := atomic.LoadInt64(&totalTransferred)
ctx, cancel := context.WithTimeout(innerCtx, 10*time.Second) ctx, cancel := context.WithTimeout(innerCtx, 3*time.Second)
defer cancel() defer cancel()
progress := io.LimitReader(&nullReader{}, dataSize) progress := io.LimitReader(&nullReader{}, dataSize)
@ -223,6 +234,9 @@ func (client *peerRESTClient) doNetTest(ctx context.Context, dataSize int64, thr
} }
wg.Wait() wg.Wait()
if slowSamples > maxSlowSamples {
return info, networkOverloaded
}
if err != nil { if err != nil {
return info, err return info, err
} }
@ -331,10 +345,6 @@ func (client *peerRESTClient) GetNetPerfInfo(ctx context.Context) (info madmin.P
if err == networkOverloaded { if err == networkOverloaded {
continue continue
} }
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
continue
}
} }
return info, err return info, err
} }