fix: attempt to drain the ReadFileStream for connection pooling (#12208)

avoid time_wait build up with getObject requests if there are
pending callers and they timeout, can lead to time_wait states

Bonus share the same buffer pool with erasure healing logic,
additionally also fixes a race where parallel readers were
never cleanup during Encode() phase, because pipe.Reader end
was never closed().

Added closer right away upon an error during Encode to make
sure to avoid racy Close() while stream was still being
Read().
This commit is contained in:
Harshavardhana 2021-05-04 10:12:08 -07:00 committed by GitHub
parent 860bf1bab2
commit ff36baeaa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 16 deletions

View File

@ -25,6 +25,7 @@ import (
"hash"
"io"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/ioutil"
)
@ -118,6 +119,14 @@ func (b *streamingBitrotReader) Close() error {
return nil
}
if closer, ok := b.rc.(io.Closer); ok {
// drain the body for connection re-use at network layer.
xhttp.DrainBody(struct {
io.Reader
io.Closer
}{
Reader: b.rc,
Closer: closeWrapper(func() error { return nil }),
})
return closer.Close()
}
return nil

View File

@ -24,6 +24,9 @@ import (
"io"
"os"
"testing"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/pkg/bpool"
)
var erasureHealTests = []struct {
@ -136,8 +139,15 @@ func TestErasureHeal(t *testing.T) {
erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true)
}
// Number of buffers, max 2GB
n := (2 * humanize.GiByte) / (int(test.blocksize) * 2)
// Initialize byte pool once for all sets, bpool size is set to
// setCount * setDriveCount with each memory upto blockSizeV2.
bp := bpool.NewBytePoolCap(n, int(test.blocksize), int(test.blocksize)*2)
// test case setup is complete - now call Heal()
err = erasure.Heal(context.Background(), readers, staleWriters, test.size)
err = erasure.Heal(context.Background(), readers, staleWriters, test.size, bp)
closeBitrotReaders(readers)
closeBitrotWriters(staleWriters)
if err != nil && !test.shouldFail {

View File

@ -451,7 +451,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true)
}
}
err = erasure.Heal(ctx, readers, writers, partSize)
err = erasure.Heal(ctx, readers, writers, partSize, er.bp)
closeBitrotReaders(readers)
closeBitrotWriters(writers)
if err != nil {

View File

@ -20,30 +20,44 @@ package cmd
import (
"context"
"io"
"sync"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool"
)
// Heal heals the shard files on non-nil writers. Note that the quorum passed is 1
// as healing should continue even if it has been successful healing only one shard file.
func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64) error {
func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error {
r, w := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
go func() {
if _, err := e.Decode(ctx, w, readers, 0, size, size, nil); err != nil {
w.CloseWithError(err)
return
}
w.Close()
defer wg.Done()
_, err := e.Decode(ctx, w, readers, 0, size, size, nil)
w.CloseWithError(err)
}()
buf := make([]byte, e.blockSize)
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
var buffer []byte
switch {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size >= e.blockSize:
buffer = bp.Get()
defer bp.Put(buffer)
case size < e.blockSize:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size+int64(e.parityBlocks+e.dataBlocks-1))
}
// quorum is 1 because CreateFile should continue writing as long as we are writing to even 1 disk.
n, err := e.Encode(ctx, r, writers, buf, 1)
if err != nil {
return err
}
if n != size {
n, err := e.Encode(ctx, r, writers, buffer, 1)
if err == nil && n != size {
logger.LogIf(ctx, errLessData)
return errLessData
err = errLessData
}
return nil
r.CloseWithError(err)
wg.Wait()
return err
}