select: Add more compression formats (#13142)

Support Zstandard, LZ4, S2, and snappy as additional 
compression formats for S3 Select.
This commit is contained in:
Klaus Post
2021-09-06 09:09:53 -07:00
committed by GitHub
parent 42b1d92b2a
commit b2c92cdaaa
6 changed files with 62 additions and 30 deletions

View File

@@ -17,6 +17,8 @@
package s3select
import "strings"
// SelectError - represents s3 select error specified in
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html#RESTObjectSELECTContent-responses-special-errors.
type SelectError interface {
@@ -66,25 +68,16 @@ func errMalformedXML(err error) *s3Error {
func errInvalidCompressionFormat(err error) *s3Error {
return &s3Error{
code: "InvalidCompressionFormat",
message: "The file is not in a supported compression format. Only GZIP and BZIP2 are supported.",
message: "The file is not in a supported compression format. GZIP, BZIP2, ZSTD, LZ4, S2 and SNAPPY are supported.",
statusCode: 400,
cause: err,
}
}
func errInvalidBZIP2CompressionFormat(err error) *s3Error {
func errInvalidCompression(err error, t CompressionType) *s3Error {
return &s3Error{
code: "InvalidCompressionFormat",
message: "BZIP2 is not applicable to the queried object. Please correct the request and try again.",
statusCode: 400,
cause: err,
}
}
func errInvalidGZIPCompressionFormat(err error) *s3Error {
return &s3Error{
code: "InvalidCompressionFormat",
message: "GZIP is not applicable to the queried object. Please correct the request and try again.",
message: strings.ToUpper(string(t)) + " is not applicable to the queried object. Please correct the request and try again.",
statusCode: 400,
cause: err,
}

View File

@@ -25,7 +25,10 @@ import (
"sync"
"sync/atomic"
"github.com/klauspost/compress/s2"
"github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip"
"github.com/pierrec/lz4"
)
type countUpReader struct {
@@ -58,7 +61,7 @@ type progressReader struct {
processedReader *countUpReader
closedMu sync.Mutex
gzr *gzip.Reader
closer io.ReadCloser
closed bool
}
@@ -80,8 +83,8 @@ func (pr *progressReader) Close() error {
return nil
}
pr.closed = true
if pr.gzr != nil {
pr.gzr.Close()
if pr.closer != nil {
pr.closer.Close()
}
return pr.rc.Close()
}
@@ -102,23 +105,37 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea
rc: rc,
scannedReader: scannedReader,
}
var err error
var r io.Reader
switch compType {
case noneType:
r = scannedReader
case gzipType:
pr.gzr, err = gzip.NewReader(scannedReader)
gzr, err := gzip.NewReader(scannedReader)
if err != nil {
if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) {
return nil, errInvalidGZIPCompressionFormat(err)
return nil, errInvalidCompression(err, compType)
}
return nil, errTruncatedInput(err)
}
r = pr.gzr
r = gzr
pr.closer = gzr
case bzip2Type:
r = bzip2.NewReader(scannedReader)
case zstdType:
// Set a max window of 64MB. More than reasonable.
zr, err := zstd.NewReader(scannedReader, zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxWindow(64<<20))
if err != nil {
return nil, errInvalidCompression(err, compType)
}
r = zr
pr.closer = zr.IOReadCloser()
case lz4Type:
r = lz4.NewReader(scannedReader)
case s2Type:
r = s2.NewReader(scannedReader)
case snappyType:
r = s2.NewReader(scannedReader, s2.ReaderMaxBlockSize(64<<10))
default:
return nil, errInvalidCompressionFormat(fmt.Errorf("unknown compression type '%v'", compType))
}

View File

@@ -31,12 +31,16 @@ import (
"strings"
"sync"
"github.com/klauspost/compress/s2"
"github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip"
"github.com/minio/minio/internal/s3select/csv"
"github.com/minio/minio/internal/s3select/json"
"github.com/minio/minio/internal/s3select/parquet"
"github.com/minio/minio/internal/s3select/simdj"
"github.com/minio/minio/internal/s3select/sql"
"github.com/minio/simdjson-go"
"github.com/pierrec/lz4"
)
type recordReader interface {
@@ -57,8 +61,13 @@ type CompressionType string
const (
noneType CompressionType = "none"
gzipType CompressionType = "gzip"
bzip2Type CompressionType = "bzip2"
gzipType CompressionType = "GZIP"
bzip2Type CompressionType = "BZIP2"
zstdType CompressionType = "ZSTD"
lz4Type CompressionType = "LZ4"
s2Type CompressionType = "S2"
snappyType CompressionType = "SNAPPY"
)
const (
@@ -87,13 +96,13 @@ func (c *CompressionType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) e
return errMalformedXML(err)
}
parsedType := CompressionType(strings.ToLower(s))
if s == "" {
parsedType := CompressionType(strings.ToUpper(s))
if s == "" || parsedType == "NONE" {
parsedType = noneType
}
switch parsedType {
case noneType, gzipType, bzip2Type:
case noneType, gzipType, bzip2Type, snappyType, s2Type, zstdType, lz4Type:
default:
return errInvalidCompressionFormat(fmt.Errorf("invalid compression format '%v'", s))
}
@@ -127,7 +136,7 @@ func (input *InputSerialization) UnmarshalXML(d *xml.Decoder, start xml.StartEle
}
// If no compression is specified, set to noneType
if parsedInput.CompressionType == CompressionType("") {
if parsedInput.CompressionType == "" {
parsedInput.CompressionType = noneType
}
@@ -309,7 +318,19 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
rc.Close()
var stErr bzip2.StructuralError
if errors.As(err, &stErr) {
return errInvalidBZIP2CompressionFormat(err)
return errInvalidCompression(err, s3Select.Input.CompressionType)
}
// Test these compressor errors
errs := []error{
gzip.ErrHeader, gzip.ErrChecksum,
s2.ErrCorrupt, s2.ErrUnsupported, s2.ErrCRC,
zstd.ErrBlockTooSmall, zstd.ErrMagicMismatch, zstd.ErrWindowSizeExceeded, zstd.ErrUnknownDictionary, zstd.ErrWindowSizeTooSmall,
lz4.ErrInvalid, lz4.ErrBlockDependency,
}
for _, e := range errs {
if errors.Is(err, e) {
return errInvalidCompression(err, s3Select.Input.CompressionType)
}
}
return err
}