Switch to Snappy -> S2 compression (#8189)

This commit is contained in:
Klaus Post
2019-09-25 23:08:24 -07:00
committed by Harshavardhana
parent be313f1758
commit ff726969aa
12 changed files with 224 additions and 160 deletions

View File

@@ -33,7 +33,8 @@ import (
"time"
"unicode/utf8"
snappy "github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"github.com/klauspost/readahead"
"github.com/minio/minio-go/v6/pkg/s3utils"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
@@ -56,6 +57,12 @@ const (
minioMetaTmpBucket = minioMetaBucket + "/tmp"
// DNS separator (period), used for bucket name validation.
dnsDelimiter = "."
// On compressed files bigger than this;
compReadAheadSize = 100 << 20
// Read this many buffers ahead.
compReadAheadBuffers = 5
// Size of each buffer.
compReadAheadBufSize = 1 << 20
)
// isMinioBucket returns true if given bucket is a MinIO internal
@@ -337,6 +344,22 @@ func (o ObjectInfo) IsCompressed() bool {
return ok
}
// IsCompressedOK returns whether the object is compressed and can be decompressed.
func (o ObjectInfo) IsCompressedOK() (bool, error) {
scheme, ok := o.UserDefined[ReservedMetadataPrefix+"compression"]
if !ok {
return false, nil
}
if crypto.IsEncrypted(o.UserDefined) {
return true, fmt.Errorf("compression %q and encryption enabled on same object", scheme)
}
switch scheme {
case compressionAlgorithmV1, compressionAlgorithmV2:
return true, nil
}
return true, fmt.Errorf("unknown compression scheme: %s", scheme)
}
// GetActualSize - read the decompressed size from the meta json.
func (o ObjectInfo) GetActualSize() int64 {
metadata := o.UserDefined
@@ -364,29 +387,34 @@ func isCompressible(header http.Header, object string) bool {
func excludeForCompression(header http.Header, object string) bool {
objStr := object
contentType := header.Get(xhttp.ContentType)
if globalIsCompressionEnabled {
// We strictly disable compression for standard extensions/content-types (`compressed`).
if hasStringSuffixInSlice(objStr, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, contentType) {
return true
}
// Filter compression includes.
if len(globalCompressExtensions) > 0 || len(globalCompressMimeTypes) > 0 {
extensions := globalCompressExtensions
mimeTypes := globalCompressMimeTypes
if hasStringSuffixInSlice(objStr, extensions) || hasPattern(mimeTypes, contentType) {
return false
}
return true
}
if !globalIsCompressionEnabled {
return true
}
// We strictly disable compression for standard extensions/content-types (`compressed`).
if hasStringSuffixInSlice(objStr, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, contentType) {
return true
}
// Filter compression includes.
if len(globalCompressExtensions) == 0 || len(globalCompressMimeTypes) == 0 {
return false
}
extensions := globalCompressExtensions
mimeTypes := globalCompressMimeTypes
if hasStringSuffixInSlice(objStr, extensions) || hasPattern(mimeTypes, contentType) {
return false
}
return true
}
// Utility which returns if a string is present in the list.
// Comparison is case insensitive.
func hasStringSuffixInSlice(str string, list []string) bool {
str = strings.ToLower(str)
for _, v := range list {
if strings.HasSuffix(str, v) {
if strings.HasSuffix(str, strings.ToLower(v)) {
return true
}
}
@@ -413,7 +441,7 @@ func getPartFile(entries []string, partNumber int, etag string) string {
return ""
}
// Returs the compressed offset which should be skipped.
// Returns the compressed offset which should be skipped.
func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (int64, int64) {
var compressedOffset int64
var skipLength int64
@@ -494,7 +522,10 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, pcfn CheckCopyPrecondi
}()
isEncrypted := crypto.IsEncrypted(oi.UserDefined)
isCompressed := oi.IsCompressed()
isCompressed, err := oi.IsCompressedOK()
if err != nil {
return nil, 0, 0, err
}
var skipLen int64
// Calculate range to read (different for
// e.g. encrypted/compressed objects)
@@ -575,7 +606,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, pcfn CheckCopyPrecondi
if err != nil {
return nil, 0, 0, err
}
// Incase of range based queries on multiparts, the offset and length are reduced.
// In case of range based queries on multiparts, the offset and length are reduced.
off, decOff = getCompressedOffsets(oi, off)
decLength = length
length = oi.Size - off
@@ -602,10 +633,23 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, pcfn CheckCopyPrecondi
}
}
// Decompression reader.
snappyReader := snappy.NewReader(inputReader)
// Apply the skipLen and limit on the
// decompressed stream
decReader := io.LimitReader(ioutil.NewSkipReader(snappyReader, decOff), decLength)
s2Reader := s2.NewReader(inputReader)
// Apply the skipLen and limit on the decompressed stream.
err = s2Reader.Skip(decOff)
if err != nil {
return nil, err
}
decReader := io.LimitReader(s2Reader, decLength)
if decLength > compReadAheadSize {
rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize)
if err == nil {
decReader = rah
cFns = append(cFns, func() {
rah.Close()
})
}
}
oi.Size = decLength
// Assemble the GetObjectReader
@@ -760,55 +804,29 @@ func CleanMinioInternalMetadataKeys(metadata map[string]string) map[string]strin
return newMeta
}
// snappyCompressReader compresses data as it reads
// from the underlying io.Reader.
type snappyCompressReader struct {
r io.Reader
w *snappy.Writer
closed bool
buf bytes.Buffer
}
func newSnappyCompressReader(r io.Reader) *snappyCompressReader {
cr := &snappyCompressReader{r: r}
cr.w = snappy.NewBufferedWriter(&cr.buf)
return cr
}
func (cr *snappyCompressReader) Read(p []byte) (int, error) {
if cr.closed {
// if snappy writer is closed r has been completely read,
// return any remaining data in buf.
return cr.buf.Read(p)
}
// read from original using p as buffer
nr, readErr := cr.r.Read(p)
// write read bytes to snappy writer
nw, err := cr.w.Write(p[:nr])
if err != nil {
return 0, err
}
if nw != nr {
return 0, io.ErrShortWrite
}
// if last of data from reader, close snappy writer to flush
if readErr == io.EOF {
err := cr.w.Close()
cr.closed = true
// newS2CompressReader will read data from r, compress it and return the compressed data as a Reader.
// Use Close to ensure resources are released on incomplete streams.
func newS2CompressReader(r io.Reader) io.ReadCloser {
pr, pw := io.Pipe()
comp := s2.NewWriter(pw)
// Copy input to compressor
go func() {
_, err := io.Copy(comp, r)
if err != nil {
return 0, err
comp.Close()
pw.CloseWithError(err)
return
}
}
// read compressed bytes out of buf
n, err := cr.buf.Read(p)
if readErr != io.EOF && (err == nil || err == io.EOF) {
err = readErr
}
return n, err
// Close the stream.
err = comp.Close()
if err != nil {
pw.CloseWithError(err)
return
}
// Everything ok, do regular close.
pw.Close()
}()
return pr
}
// Returns error if the cancelCh has been closed (indicating that S3 client has disconnected)