mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
Fix SkipReader performance with small initial read (#20030)
If `SkipReader` is called with a small initial buffer it may be doing a huge number if Reads to skip the requested number of bytes. If a small buffer is provided grab a 32K buffer and use that. Fixes slow execution of `testAPIGetObjectWithMPHandler`. Bonuses: * Use `-short` with `-race` test. * Do all suite test types with `-short`. * Enable compressed+encrypted in `testAPIGetObjectWithMPHandler`. * Disable big file tests in `testAPIGetObjectWithMPHandler` when using `-short`.
This commit is contained in:
parent
ca0ce4c6ef
commit
2040559f71
@ -6,5 +6,5 @@ export GORACE="history_size=7"
|
||||
export MINIO_API_REQUESTS_MAX=10000
|
||||
|
||||
for d in $(go list ./...); do
|
||||
CGO_ENABLED=1 go test -v -race --timeout 100m "$d"
|
||||
CGO_ENABLED=1 go test -v -race -short --timeout 100m "$d"
|
||||
done
|
||||
|
@ -61,10 +61,9 @@ func NewDummyDataGen(totalLength, skipOffset int64) io.ReadSeeker {
|
||||
}
|
||||
|
||||
skipOffset %= int64(len(alphabets))
|
||||
as := make([]byte, 2*len(alphabets))
|
||||
copy(as, alphabets)
|
||||
copy(as[len(alphabets):], alphabets)
|
||||
b := as[skipOffset : skipOffset+int64(len(alphabets))]
|
||||
const multiply = 100
|
||||
as := bytes.Repeat(alphabets, multiply)
|
||||
b := as[skipOffset : skipOffset+int64(len(alphabets)*(multiply-1))]
|
||||
return &DummyDataGen{
|
||||
length: totalLength,
|
||||
b: b,
|
||||
|
@ -692,8 +692,8 @@ func testAPIGetObjectWithMPHandler(obj ObjectLayer, instanceType, bucketName str
|
||||
{"small-1", []int64{509}, make(map[string]string)},
|
||||
{"small-2", []int64{5 * oneMiB}, make(map[string]string)},
|
||||
// // // cases 4-7: multipart part objects
|
||||
{"mp-0", []int64{5 * oneMiB, 1}, make(map[string]string)},
|
||||
{"mp-1", []int64{5*oneMiB + 1, 1}, make(map[string]string)},
|
||||
{"mp-0", []int64{5 * oneMiB, 10}, make(map[string]string)},
|
||||
{"mp-1", []int64{5*oneMiB + 1, 10}, make(map[string]string)},
|
||||
{"mp-2", []int64{5487701, 5487799, 3}, make(map[string]string)},
|
||||
{"mp-3", []int64{10499807, 10499963, 7}, make(map[string]string)},
|
||||
// cases 8-11: small single part objects with encryption
|
||||
@ -702,17 +702,13 @@ func testAPIGetObjectWithMPHandler(obj ObjectLayer, instanceType, bucketName str
|
||||
{"enc-small-1", []int64{509}, mapCopy(metaWithSSEC)},
|
||||
{"enc-small-2", []int64{5 * oneMiB}, mapCopy(metaWithSSEC)},
|
||||
// cases 12-15: multipart part objects with encryption
|
||||
{"enc-mp-0", []int64{5 * oneMiB, 1}, mapCopy(metaWithSSEC)},
|
||||
{"enc-mp-1", []int64{5*oneMiB + 1, 1}, mapCopy(metaWithSSEC)},
|
||||
{"enc-mp-0", []int64{5 * oneMiB, 10}, mapCopy(metaWithSSEC)},
|
||||
{"enc-mp-1", []int64{5*oneMiB + 1, 10}, mapCopy(metaWithSSEC)},
|
||||
{"enc-mp-2", []int64{5487701, 5487799, 3}, mapCopy(metaWithSSEC)},
|
||||
{"enc-mp-3", []int64{10499807, 10499963, 7}, mapCopy(metaWithSSEC)},
|
||||
}
|
||||
// SSEC can't be used with compression
|
||||
globalCompressConfigMu.Lock()
|
||||
globalCompressEnabled := globalCompressConfig.Enabled
|
||||
globalCompressConfigMu.Unlock()
|
||||
if globalCompressEnabled {
|
||||
objectInputs = objectInputs[0:8]
|
||||
if testing.Short() {
|
||||
objectInputs = append(objectInputs[0:5], objectInputs[8:11]...)
|
||||
}
|
||||
// iterate through the above set of inputs and upload the object.
|
||||
for _, input := range objectInputs {
|
||||
@ -768,6 +764,7 @@ func testAPIGetObjectWithMPHandler(obj ObjectLayer, instanceType, bucketName str
|
||||
readers = append(readers, NewDummyDataGen(p, cumulativeSum))
|
||||
cumulativeSum += p
|
||||
}
|
||||
|
||||
refReader := io.LimitReader(ioutilx.NewSkipReader(io.MultiReader(readers...), off), length)
|
||||
if ok, msg := cmpReaders(refReader, rec.Body); !ok {
|
||||
t.Fatalf("(%s) Object: %s Case %d ByteRange: %s --> data mismatch! (msg: %s)", instanceType, oi.objectName, i+1, byteRange, msg)
|
||||
|
@ -559,21 +559,17 @@ func execExtended(t *testing.T, fn func(t *testing.T, init func(), bucketOptions
|
||||
t.Run("default", func(t *testing.T) {
|
||||
fn(t, nil, MakeBucketOptions{})
|
||||
})
|
||||
t.Run("defaultVerioned", func(t *testing.T) {
|
||||
t.Run("default+versioned", func(t *testing.T) {
|
||||
fn(t, nil, MakeBucketOptions{VersioningEnabled: true})
|
||||
})
|
||||
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
|
||||
t.Run("compressed", func(t *testing.T) {
|
||||
fn(t, func() {
|
||||
resetCompressEncryption()
|
||||
enableCompression(t, false, []string{"*"}, []string{"*"})
|
||||
}, MakeBucketOptions{})
|
||||
})
|
||||
t.Run("compressedVerioned", func(t *testing.T) {
|
||||
t.Run("compressed+versioned", func(t *testing.T) {
|
||||
fn(t, func() {
|
||||
resetCompressEncryption()
|
||||
enableCompression(t, false, []string{"*"}, []string{"*"})
|
||||
@ -588,7 +584,7 @@ func execExtended(t *testing.T, fn func(t *testing.T, init func(), bucketOptions
|
||||
enableEncryption(t)
|
||||
}, MakeBucketOptions{})
|
||||
})
|
||||
t.Run("encryptedVerioned", func(t *testing.T) {
|
||||
t.Run("encrypted+versioned", func(t *testing.T) {
|
||||
fn(t, func() {
|
||||
resetCompressEncryption()
|
||||
enableEncryption(t)
|
||||
@ -603,7 +599,7 @@ func execExtended(t *testing.T, fn func(t *testing.T, init func(), bucketOptions
|
||||
enableCompression(t, true, []string{"*"}, []string{"*"})
|
||||
}, MakeBucketOptions{})
|
||||
})
|
||||
t.Run("compressed+encryptedVerioned", func(t *testing.T) {
|
||||
t.Run("compressed+encrypted+versioned", func(t *testing.T) {
|
||||
fn(t, func() {
|
||||
resetCompressEncryption()
|
||||
enableCompression(t, true, []string{"*"}, []string{"*"})
|
||||
|
@ -256,15 +256,26 @@ func (s *SkipReader) Read(p []byte) (int, error) {
|
||||
if l == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
for s.skipCount > 0 {
|
||||
if l > s.skipCount {
|
||||
l = s.skipCount
|
||||
if s.skipCount > 0 {
|
||||
tmp := p
|
||||
if s.skipCount > l && l < copyBufferSize {
|
||||
// We may get a very small buffer, so we grab a temporary buffer.
|
||||
bufp := copyBufPool.Get().(*[]byte)
|
||||
buf := *bufp
|
||||
tmp = buf[:copyBufferSize]
|
||||
defer copyBufPool.Put(bufp)
|
||||
l = int64(len(tmp))
|
||||
}
|
||||
n, err := s.Reader.Read(p[:l])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
for s.skipCount > 0 {
|
||||
if l > s.skipCount {
|
||||
l = s.skipCount
|
||||
}
|
||||
n, err := s.Reader.Read(tmp[:l])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s.skipCount -= int64(n)
|
||||
}
|
||||
s.skipCount -= int64(n)
|
||||
}
|
||||
return s.Reader.Read(p)
|
||||
}
|
||||
@ -274,9 +285,11 @@ func NewSkipReader(r io.Reader, n int64) io.Reader {
|
||||
return &SkipReader{r, n}
|
||||
}
|
||||
|
||||
const copyBufferSize = 32 * 1024
|
||||
|
||||
var copyBufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := make([]byte, 32*1024)
|
||||
b := make([]byte, copyBufferSize)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
@ -285,6 +298,7 @@ var copyBufPool = sync.Pool{
|
||||
func Copy(dst io.Writer, src io.Reader) (written int64, err error) {
|
||||
bufp := copyBufPool.Get().(*[]byte)
|
||||
buf := *bufp
|
||||
buf = buf[:copyBufferSize]
|
||||
defer copyBufPool.Put(bufp)
|
||||
|
||||
return io.CopyBuffer(dst, src, buf)
|
||||
|
Loading…
x
Reference in New Issue
Block a user