From c2eb60df4ac7229bf09bd2e0e1a65ce22b12ef74 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 18 Oct 2021 08:44:36 -0700 Subject: [PATCH] bz2: limit max concurrent CPU (#13458) Ensure that bz2 decompression will never take more than 50% CPU. --- cmd/untar.go | 7 ++++++- go.mod | 2 +- go.sum | 4 ++-- internal/s3select/progress.go | 8 +++++++- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/cmd/untar.go b/cmd/untar.go index b13d313ea..6492feffb 100644 --- a/cmd/untar.go +++ b/cmd/untar.go @@ -35,6 +35,9 @@ import ( "github.com/pierrec/lz4" ) +// Max bzip2 concurrency across calls. 50% of GOMAXPROCS. +var bz2Limiter = pbzip2.CreateConcurrencyPool((runtime.GOMAXPROCS(0) + 1) / 2) + func detect(r *bufio.Reader) format { z, err := r.Peek(4) if err != nil { @@ -116,7 +119,9 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name case formatBZ2: ctx, cancel := context.WithCancel(context.Background()) defer cancel() - r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2))) + r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions( + pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2), + pbzip2.BZConcurrencyPool(bz2Limiter))) case formatLZ4: r = lz4.NewReader(bf) case formatUnknown: diff --git a/go.mod b/go.mod index 500a5032b..220136a04 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/colinmarc/hdfs/v2 v2.2.0 github.com/coredns/coredns v1.4.0 - github.com/cosnicolaou/pbzip2 v1.0.0 + github.com/cosnicolaou/pbzip2 v1.0.1 github.com/dchest/siphash v1.2.1 github.com/djherbis/atime v1.0.0 github.com/dswarbrick/smart v0.0.0-20190505152634-909a45200d6d diff --git a/go.sum b/go.sum index c5e6aef8f..1d30a8582 100644 --- a/go.sum +++ b/go.sum @@ -258,8 +258,8 @@ github.com/coreos/go-systemd/v22 v22.3.1/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cosnicolaou/pbzip2 v1.0.0 h1:T/807kTuUNv7hYj4eYTIdGuJ41N5EcpYX6cOMm46Bdc= -github.com/cosnicolaou/pbzip2 v1.0.0/go.mod h1:cE04zhBMvwMrCLhsx6aLYh9cGsU9GyFB0oo/GmO+SkY= +github.com/cosnicolaou/pbzip2 v1.0.1 h1:f5Ix000Rtl9tr0Ne33wNLtljGl2nAyR4ZirJrz9qg+0= +github.com/cosnicolaou/pbzip2 v1.0.1/go.mod h1:cE04zhBMvwMrCLhsx6aLYh9cGsU9GyFB0oo/GmO+SkY= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= diff --git a/internal/s3select/progress.go b/internal/s3select/progress.go index 8f2a490e6..a777b429f 100644 --- a/internal/s3select/progress.go +++ b/internal/s3select/progress.go @@ -38,6 +38,9 @@ type countUpReader struct { bytesRead int64 } +// Max bzip2 concurrency across calls. 50% of GOMAXPROCS. +var bz2Limiter = pbzip2.CreateConcurrencyPool((runtime.GOMAXPROCS(0) + 1) / 2) + func (r *countUpReader) Read(p []byte) (n int, err error) { n, err = r.reader.Read(p) atomic.AddInt64(&r.bytesRead, int64(n)) @@ -124,7 +127,10 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea pr.closer = gzr case bzip2Type: ctx, cancel := context.WithCancel(context.Background()) - r = pbzip2.NewReader(ctx, scannedReader, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2))) + r = pbzip2.NewReader(ctx, scannedReader, pbzip2.DecompressionOptions( + pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2), + pbzip2.BZConcurrencyPool(bz2Limiter), + )) pr.closer = &nopReadCloser{fn: cancel} case zstdType: // Set a max window of 64MB. More than reasonable.