mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
bz2: limit max concurrent CPU (#13458)
Ensure that bz2 decompression will never take more than 50% CPU.
This commit is contained in:
parent
feabd0430c
commit
c2eb60df4a
@ -35,6 +35,9 @@ import (
|
|||||||
"github.com/pierrec/lz4"
|
"github.com/pierrec/lz4"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Max bzip2 concurrency across calls. 50% of GOMAXPROCS.
|
||||||
|
var bz2Limiter = pbzip2.CreateConcurrencyPool((runtime.GOMAXPROCS(0) + 1) / 2)
|
||||||
|
|
||||||
func detect(r *bufio.Reader) format {
|
func detect(r *bufio.Reader) format {
|
||||||
z, err := r.Peek(4)
|
z, err := r.Peek(4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -116,7 +119,9 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name
|
|||||||
case formatBZ2:
|
case formatBZ2:
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2)))
|
r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions(
|
||||||
|
pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2),
|
||||||
|
pbzip2.BZConcurrencyPool(bz2Limiter)))
|
||||||
case formatLZ4:
|
case formatLZ4:
|
||||||
r = lz4.NewReader(bf)
|
r = lz4.NewReader(bf)
|
||||||
case formatUnknown:
|
case formatUnknown:
|
||||||
|
2
go.mod
2
go.mod
@ -16,7 +16,7 @@ require (
|
|||||||
github.com/cheggaaa/pb v1.0.29
|
github.com/cheggaaa/pb v1.0.29
|
||||||
github.com/colinmarc/hdfs/v2 v2.2.0
|
github.com/colinmarc/hdfs/v2 v2.2.0
|
||||||
github.com/coredns/coredns v1.4.0
|
github.com/coredns/coredns v1.4.0
|
||||||
github.com/cosnicolaou/pbzip2 v1.0.0
|
github.com/cosnicolaou/pbzip2 v1.0.1
|
||||||
github.com/dchest/siphash v1.2.1
|
github.com/dchest/siphash v1.2.1
|
||||||
github.com/djherbis/atime v1.0.0
|
github.com/djherbis/atime v1.0.0
|
||||||
github.com/dswarbrick/smart v0.0.0-20190505152634-909a45200d6d
|
github.com/dswarbrick/smart v0.0.0-20190505152634-909a45200d6d
|
||||||
|
4
go.sum
4
go.sum
@ -258,8 +258,8 @@ github.com/coreos/go-systemd/v22 v22.3.1/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
|
|||||||
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
|
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
|
||||||
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
|
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
|
||||||
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
|
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
|
||||||
github.com/cosnicolaou/pbzip2 v1.0.0 h1:T/807kTuUNv7hYj4eYTIdGuJ41N5EcpYX6cOMm46Bdc=
|
github.com/cosnicolaou/pbzip2 v1.0.1 h1:f5Ix000Rtl9tr0Ne33wNLtljGl2nAyR4ZirJrz9qg+0=
|
||||||
github.com/cosnicolaou/pbzip2 v1.0.0/go.mod h1:cE04zhBMvwMrCLhsx6aLYh9cGsU9GyFB0oo/GmO+SkY=
|
github.com/cosnicolaou/pbzip2 v1.0.1/go.mod h1:cE04zhBMvwMrCLhsx6aLYh9cGsU9GyFB0oo/GmO+SkY=
|
||||||
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
|
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||||
|
@ -38,6 +38,9 @@ type countUpReader struct {
|
|||||||
bytesRead int64
|
bytesRead int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Max bzip2 concurrency across calls. 50% of GOMAXPROCS.
|
||||||
|
var bz2Limiter = pbzip2.CreateConcurrencyPool((runtime.GOMAXPROCS(0) + 1) / 2)
|
||||||
|
|
||||||
func (r *countUpReader) Read(p []byte) (n int, err error) {
|
func (r *countUpReader) Read(p []byte) (n int, err error) {
|
||||||
n, err = r.reader.Read(p)
|
n, err = r.reader.Read(p)
|
||||||
atomic.AddInt64(&r.bytesRead, int64(n))
|
atomic.AddInt64(&r.bytesRead, int64(n))
|
||||||
@ -124,7 +127,10 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea
|
|||||||
pr.closer = gzr
|
pr.closer = gzr
|
||||||
case bzip2Type:
|
case bzip2Type:
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
r = pbzip2.NewReader(ctx, scannedReader, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2)))
|
r = pbzip2.NewReader(ctx, scannedReader, pbzip2.DecompressionOptions(
|
||||||
|
pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2),
|
||||||
|
pbzip2.BZConcurrencyPool(bz2Limiter),
|
||||||
|
))
|
||||||
pr.closer = &nopReadCloser{fn: cancel}
|
pr.closer = &nopReadCloser{fn: cancel}
|
||||||
case zstdType:
|
case zstdType:
|
||||||
// Set a max window of 64MB. More than reasonable.
|
// Set a max window of 64MB. More than reasonable.
|
||||||
|
Loading…
Reference in New Issue
Block a user