mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
simplify the reader for speedtest (#13682)
additionally count only success operations, truncated incomplete calls don't need to be counted.
This commit is contained in:
@@ -19,7 +19,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -40,6 +39,7 @@ import (
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/randreader"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
||||
@@ -1125,31 +1125,8 @@ type SpeedtestResult struct {
|
||||
Error string
|
||||
}
|
||||
|
||||
// SpeedtestObject implements "random-read" object reader
|
||||
type SpeedtestObject struct {
|
||||
buf []byte
|
||||
remaining int
|
||||
totalBytesWritten *uint64
|
||||
}
|
||||
|
||||
func (bo *SpeedtestObject) Read(b []byte) (int, error) {
|
||||
if bo.remaining == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
if len(b) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
if len(b) > len(bo.buf) {
|
||||
b = b[:len(bo.buf)]
|
||||
}
|
||||
if len(b) > bo.remaining {
|
||||
b = b[:bo.remaining]
|
||||
}
|
||||
copy(b, bo.buf)
|
||||
bo.remaining -= len(b)
|
||||
|
||||
atomic.AddUint64(bo.totalBytesWritten, uint64(len(b)))
|
||||
return len(b), nil
|
||||
func newRandomReader(size int) io.Reader {
|
||||
return io.LimitReader(randreader.New(), int64(size))
|
||||
}
|
||||
|
||||
// Runs the speedtest on local MinIO process.
|
||||
@@ -1162,10 +1139,6 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
|
||||
var retError string
|
||||
|
||||
bucket := minioMetaSpeedTestBucket
|
||||
|
||||
buf := make([]byte, humanize.MiByte)
|
||||
rand.Read(buf)
|
||||
|
||||
objCountPerThread := make([]uint64, concurrent)
|
||||
|
||||
uploadsStopped := false
|
||||
@@ -1187,7 +1160,7 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
hashReader, err := hash.NewReader(&SpeedtestObject{buf, size, &totalBytesWritten},
|
||||
hashReader, err := hash.NewReader(newRandomReader(size),
|
||||
int64(size), "", "", int64(size))
|
||||
if err != nil {
|
||||
retError = err.Error()
|
||||
@@ -1195,7 +1168,7 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
|
||||
break
|
||||
}
|
||||
reader := NewPutObjReader(hashReader)
|
||||
_, err = objAPI.PutObject(uploadsCtx, bucket, fmt.Sprintf("%s.%d.%d",
|
||||
objInfo, err := objAPI.PutObject(uploadsCtx, bucket, fmt.Sprintf("%s.%d.%d",
|
||||
objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{})
|
||||
if err != nil && !uploadsStopped {
|
||||
retError = err.Error()
|
||||
@@ -1204,6 +1177,7 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
atomic.AddUint64(&totalBytesWritten, uint64(objInfo.Size))
|
||||
objCountPerThread[i]++
|
||||
}
|
||||
}(i)
|
||||
@@ -1243,8 +1217,12 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
|
||||
}
|
||||
n, err := io.Copy(ioutil.Discard, r)
|
||||
r.Close()
|
||||
|
||||
atomic.AddUint64(&totalBytesRead, uint64(n))
|
||||
if err == nil {
|
||||
// Only capture success criteria - do not
|
||||
// have to capture failed reads, truncated
|
||||
// reads etc.
|
||||
atomic.AddUint64(&totalBytesRead, uint64(n))
|
||||
}
|
||||
if err != nil && !downloadsStopped {
|
||||
retError = err.Error()
|
||||
logger.LogIf(ctx, err)
|
||||
|
||||
Reference in New Issue
Block a user