From ff36baeaa77a5f5e881ab84029ff92fa0e3c2a35 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 4 May 2021 10:12:08 -0700 Subject: [PATCH] fix: attempt to drain the ReadFileStream for connection pooling (#12208) avoid time_wait build up with getObject requests if there are pending callers and they timeout, can lead to time_wait states Bonus share the same buffer pool with erasure healing logic, additionally also fixes a race where parallel readers were never cleanup during Encode() phase, because pipe.Reader end was never closed(). Added closer right away upon an error during Encode to make sure to avoid racy Close() while stream was still being Read(). --- cmd/bitrot-streaming.go | 9 ++++++++ cmd/erasure-heal_test.go | 12 ++++++++++- cmd/erasure-healing.go | 2 +- cmd/erasure-lowlevel-heal.go | 42 ++++++++++++++++++++++++------------ 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index e5376bc2e..b18d71a10 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -25,6 +25,7 @@ import ( "hash" "io" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/ioutil" ) @@ -118,6 +119,14 @@ func (b *streamingBitrotReader) Close() error { return nil } if closer, ok := b.rc.(io.Closer); ok { + // drain the body for connection re-use at network layer. + xhttp.DrainBody(struct { + io.Reader + io.Closer + }{ + Reader: b.rc, + Closer: closeWrapper(func() error { return nil }), + }) return closer.Close() } return nil diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index 1bb6aa506..e7cd3a300 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -24,6 +24,9 @@ import ( "io" "os" "testing" + + humanize "github.com/dustin/go-humanize" + "github.com/minio/minio/pkg/bpool" ) var erasureHealTests = []struct { @@ -136,8 +139,15 @@ func TestErasureHeal(t *testing.T) { erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true) } + // Number of buffers, max 2GB + n := (2 * humanize.GiByte) / (int(test.blocksize) * 2) + + // Initialize byte pool once for all sets, bpool size is set to + // setCount * setDriveCount with each memory upto blockSizeV2. + bp := bpool.NewBytePoolCap(n, int(test.blocksize), int(test.blocksize)*2) + // test case setup is complete - now call Heal() - err = erasure.Heal(context.Background(), readers, staleWriters, test.size) + err = erasure.Heal(context.Background(), readers, staleWriters, test.size, bp) closeBitrotReaders(readers) closeBitrotWriters(staleWriters) if err != nil && !test.shouldFail { diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index ce1938cc8..daba4b827 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -451,7 +451,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true) } } - err = erasure.Heal(ctx, readers, writers, partSize) + err = erasure.Heal(ctx, readers, writers, partSize, er.bp) closeBitrotReaders(readers) closeBitrotWriters(writers) if err != nil { diff --git a/cmd/erasure-lowlevel-heal.go b/cmd/erasure-lowlevel-heal.go index a1ab73968..30deef702 100644 --- a/cmd/erasure-lowlevel-heal.go +++ b/cmd/erasure-lowlevel-heal.go @@ -20,30 +20,44 @@ package cmd import ( "context" "io" + "sync" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bpool" ) // Heal heals the shard files on non-nil writers. Note that the quorum passed is 1 // as healing should continue even if it has been successful healing only one shard file. -func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64) error { +func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error { r, w := io.Pipe() + var wg sync.WaitGroup + wg.Add(1) go func() { - if _, err := e.Decode(ctx, w, readers, 0, size, size, nil); err != nil { - w.CloseWithError(err) - return - } - w.Close() + defer wg.Done() + _, err := e.Decode(ctx, w, readers, 0, size, size, nil) + w.CloseWithError(err) }() - buf := make([]byte, e.blockSize) + + // Fetch buffer for I/O, returns from the pool if not allocates a new one and returns. + var buffer []byte + switch { + case size == 0: + buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF + case size >= e.blockSize: + buffer = bp.Get() + defer bp.Put(buffer) + case size < e.blockSize: + // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. + buffer = make([]byte, size, 2*size+int64(e.parityBlocks+e.dataBlocks-1)) + } + // quorum is 1 because CreateFile should continue writing as long as we are writing to even 1 disk. - n, err := e.Encode(ctx, r, writers, buf, 1) - if err != nil { - return err - } - if n != size { + n, err := e.Encode(ctx, r, writers, buffer, 1) + if err == nil && n != size { logger.LogIf(ctx, errLessData) - return errLessData + err = errLessData } - return nil + r.CloseWithError(err) + wg.Wait() + return err }