diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index e5376bc2e..b18d71a10 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -25,6 +25,7 @@ import ( "hash" "io" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/ioutil" ) @@ -118,6 +119,14 @@ func (b *streamingBitrotReader) Close() error { return nil } if closer, ok := b.rc.(io.Closer); ok { + // drain the body for connection re-use at network layer. + xhttp.DrainBody(struct { + io.Reader + io.Closer + }{ + Reader: b.rc, + Closer: closeWrapper(func() error { return nil }), + }) return closer.Close() } return nil diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index 1bb6aa506..e7cd3a300 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -24,6 +24,9 @@ import ( "io" "os" "testing" + + humanize "github.com/dustin/go-humanize" + "github.com/minio/minio/pkg/bpool" ) var erasureHealTests = []struct { @@ -136,8 +139,15 @@ func TestErasureHeal(t *testing.T) { erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true) } + // Number of buffers, max 2GB + n := (2 * humanize.GiByte) / (int(test.blocksize) * 2) + + // Initialize byte pool once for all sets, bpool size is set to + // setCount * setDriveCount with each memory upto blockSizeV2. + bp := bpool.NewBytePoolCap(n, int(test.blocksize), int(test.blocksize)*2) + // test case setup is complete - now call Heal() - err = erasure.Heal(context.Background(), readers, staleWriters, test.size) + err = erasure.Heal(context.Background(), readers, staleWriters, test.size, bp) closeBitrotReaders(readers) closeBitrotWriters(staleWriters) if err != nil && !test.shouldFail { diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index ce1938cc8..daba4b827 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -451,7 +451,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true) } } - err = erasure.Heal(ctx, readers, writers, partSize) + err = erasure.Heal(ctx, readers, writers, partSize, er.bp) closeBitrotReaders(readers) closeBitrotWriters(writers) if err != nil { diff --git a/cmd/erasure-lowlevel-heal.go b/cmd/erasure-lowlevel-heal.go index a1ab73968..30deef702 100644 --- a/cmd/erasure-lowlevel-heal.go +++ b/cmd/erasure-lowlevel-heal.go @@ -20,30 +20,44 @@ package cmd import ( "context" "io" + "sync" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bpool" ) // Heal heals the shard files on non-nil writers. Note that the quorum passed is 1 // as healing should continue even if it has been successful healing only one shard file. -func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64) error { +func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error { r, w := io.Pipe() + var wg sync.WaitGroup + wg.Add(1) go func() { - if _, err := e.Decode(ctx, w, readers, 0, size, size, nil); err != nil { - w.CloseWithError(err) - return - } - w.Close() + defer wg.Done() + _, err := e.Decode(ctx, w, readers, 0, size, size, nil) + w.CloseWithError(err) }() - buf := make([]byte, e.blockSize) + + // Fetch buffer for I/O, returns from the pool if not allocates a new one and returns. + var buffer []byte + switch { + case size == 0: + buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF + case size >= e.blockSize: + buffer = bp.Get() + defer bp.Put(buffer) + case size < e.blockSize: + // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. + buffer = make([]byte, size, 2*size+int64(e.parityBlocks+e.dataBlocks-1)) + } + // quorum is 1 because CreateFile should continue writing as long as we are writing to even 1 disk. - n, err := e.Encode(ctx, r, writers, buf, 1) - if err != nil { - return err - } - if n != size { + n, err := e.Encode(ctx, r, writers, buffer, 1) + if err == nil && n != size { logger.LogIf(ctx, errLessData) - return errLessData + err = errLessData } - return nil + r.CloseWithError(err) + wg.Wait() + return err }