Use concurrent bz2 decompression (#13360)

Testing with `mc sql --compression BZIP2 --csv-input "rd=\n,fh=USE,fd=;" --query="select COUNT(*) from S3Object" local2/testbucket/nyc-taxi-data-10M.csv.bz2`

Before 96.98s, after 10.79s. Uses about 70% CPU while running.
This commit is contained in:
Klaus Post 2021-10-14 11:11:07 -07:00 committed by GitHub
parent 974073a2e5
commit 5e53f767c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 4 deletions

View File

@ -21,12 +21,14 @@ import (
"archive/tar" "archive/tar"
"bufio" "bufio"
"bytes" "bytes"
"compress/bzip2" "context"
"fmt" "fmt"
"io" "io"
"os" "os"
"path" "path"
"runtime"
"github.com/cosnicolaou/pbzip2"
"github.com/klauspost/compress/s2" "github.com/klauspost/compress/s2"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip" gzip "github.com/klauspost/pgzip"
@ -112,7 +114,9 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name
defer dec.Close() defer dec.Close()
r = dec r = dec
case formatBZ2: case formatBZ2:
r = bzip2.NewReader(bf) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2)))
case formatLZ4: case formatLZ4:
r = lz4.NewReader(bf) r = lz4.NewReader(bf)
case formatUnknown: case formatUnknown:

1
go.mod
View File

@ -16,6 +16,7 @@ require (
github.com/cheggaaa/pb v1.0.29 github.com/cheggaaa/pb v1.0.29
github.com/colinmarc/hdfs/v2 v2.2.0 github.com/colinmarc/hdfs/v2 v2.2.0
github.com/coredns/coredns v1.4.0 github.com/coredns/coredns v1.4.0
github.com/cosnicolaou/pbzip2 v1.0.0
github.com/dchest/siphash v1.2.1 github.com/dchest/siphash v1.2.1
github.com/djherbis/atime v1.0.0 github.com/djherbis/atime v1.0.0
github.com/dswarbrick/smart v0.0.0-20190505152634-909a45200d6d github.com/dswarbrick/smart v0.0.0-20190505152634-909a45200d6d

2
go.sum
View File

@ -258,6 +258,8 @@ github.com/coreos/go-systemd/v22 v22.3.1/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cosnicolaou/pbzip2 v1.0.0 h1:T/807kTuUNv7hYj4eYTIdGuJ41N5EcpYX6cOMm46Bdc=
github.com/cosnicolaou/pbzip2 v1.0.0/go.mod h1:cE04zhBMvwMrCLhsx6aLYh9cGsU9GyFB0oo/GmO+SkY=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=

View File

@ -18,13 +18,15 @@
package s3select package s3select
import ( import (
"compress/bzip2" "context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/cosnicolaou/pbzip2"
"github.com/klauspost/compress/s2" "github.com/klauspost/compress/s2"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip" gzip "github.com/klauspost/pgzip"
@ -121,7 +123,9 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea
r = gzr r = gzr
pr.closer = gzr pr.closer = gzr
case bzip2Type: case bzip2Type:
r = bzip2.NewReader(scannedReader) ctx, cancel := context.WithCancel(context.Background())
r = pbzip2.NewReader(ctx, scannedReader, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2)))
pr.closer = &nopReadCloser{fn: cancel}
case zstdType: case zstdType:
// Set a max window of 64MB. More than reasonable. // Set a max window of 64MB. More than reasonable.
zr, err := zstd.NewReader(scannedReader, zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxWindow(64<<20)) zr, err := zstd.NewReader(scannedReader, zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxWindow(64<<20))
@ -143,3 +147,19 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea
return &pr, nil return &pr, nil
} }
type nopReadCloser struct {
fn func()
}
func (n2 *nopReadCloser) Read(p []byte) (n int, err error) {
panic("should not be called")
}
func (n2 *nopReadCloser) Close() error {
if n2.fn != nil {
n2.fn()
}
n2.fn = nil
return nil
}