Allow Compression + encryption (#11103)

This commit is contained in:
Klaus Post
2021-01-05 20:08:35 -08:00
committed by GitHub
parent 97a4c120e9
commit eb9172eecb
14 changed files with 399 additions and 234 deletions

View File

@@ -394,9 +394,6 @@ func (o ObjectInfo) IsCompressedOK() (bool, error) {
if !ok {
return false, nil
}
if crypto.IsEncrypted(o.UserDefined) {
return true, fmt.Errorf("compression %q and encryption enabled on same object", scheme)
}
switch scheme {
case compressionAlgorithmV1, compressionAlgorithmV2:
return true, nil
@@ -415,9 +412,6 @@ func (o ObjectInfo) GetActualETag(h http.Header) string {
// GetActualSize - returns the actual size of the stored object
func (o ObjectInfo) GetActualSize() (int64, error) {
if crypto.IsEncrypted(o.UserDefined) {
return o.DecryptedSize()
}
if o.IsCompressed() {
sizeStr, ok := o.UserDefined[ReservedMetadataPrefix+"actual-size"]
if !ok {
@@ -429,6 +423,10 @@ func (o ObjectInfo) GetActualSize() (int64, error) {
}
return size, nil
}
if crypto.IsEncrypted(o.UserDefined) {
return o.DecryptedSize()
}
return o.Size, nil
}
@@ -441,7 +439,7 @@ func isCompressible(header http.Header, object string) bool {
globalCompressConfigMu.Unlock()
_, ok := crypto.IsRequested(header)
if !cfg.Enabled || ok || excludeForCompression(header, object, cfg) {
if !cfg.Enabled || (ok && !cfg.AllowEncrypted) || excludeForCompression(header, object, cfg) {
return false
}
return true
@@ -461,16 +459,15 @@ func excludeForCompression(header http.Header, object string, cfg compress.Confi
}
// Filter compression includes.
if len(cfg.Extensions) == 0 || len(cfg.MimeTypes) == 0 {
return false
exclude := len(cfg.Extensions) > 0 || len(cfg.MimeTypes) > 0
if len(cfg.Extensions) > 0 && hasStringSuffixInSlice(objStr, cfg.Extensions) {
exclude = false
}
extensions := cfg.Extensions
mimeTypes := cfg.MimeTypes
if hasStringSuffixInSlice(objStr, extensions) || hasPattern(mimeTypes, contentType) {
return false
if len(cfg.MimeTypes) > 0 && hasPattern(cfg.MimeTypes, contentType) {
exclude = false
}
return true
return exclude
}
// Utility which returns if a string is present in the list.
@@ -520,21 +517,29 @@ func partNumberToRangeSpec(oi ObjectInfo, partNumber int) *HTTPRangeSpec {
}
// Returns the compressed offset which should be skipped.
func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (int64, int64) {
var compressedOffset int64
// If encrypted offsets are adjusted for encrypted block headers/trailers.
// Since de-compression is after decryption encryption overhead is only added to compressedOffset.
func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset int64, partSkip int64) {
var skipLength int64
var cumulativeActualSize int64
var firstPartIdx int
if len(objectInfo.Parts) > 0 {
for _, part := range objectInfo.Parts {
for i, part := range objectInfo.Parts {
cumulativeActualSize += part.ActualSize
if cumulativeActualSize <= offset {
compressedOffset += part.Size
} else {
firstPartIdx = i
skipLength = cumulativeActualSize - part.ActualSize
break
}
}
}
if isEncryptedMultipart(objectInfo) && firstPartIdx > 0 {
off, _, _, _, _, err := objectInfo.GetDecryptedRange(partNumberToRangeSpec(objectInfo, firstPartIdx))
logger.LogIf(context.Background(), err)
compressedOffset += off
}
return compressedOffset, offset - skipLength
}
@@ -604,9 +609,92 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
isEncrypted = false
}
var skipLen int64
// Calculate range to read (different for
// e.g. encrypted/compressed objects)
// Calculate range to read (different for encrypted/compressed objects)
switch {
case isCompressed:
// If compressed, we start from the beginning of the part.
// Read the decompressed size from the meta.json.
actualSize, err := oi.GetActualSize()
if err != nil {
return nil, 0, 0, err
}
off, length = int64(0), oi.Size
decOff, decLength := int64(0), actualSize
if rs != nil {
off, length, err = rs.GetOffsetLength(actualSize)
if err != nil {
return nil, 0, 0, err
}
// In case of range based queries on multiparts, the offset and length are reduced.
off, decOff = getCompressedOffsets(oi, off)
decLength = length
length = oi.Size - off
// For negative length we read everything.
if decLength < 0 {
decLength = actualSize - decOff
}
// Reply back invalid range if the input offset and length fall out of range.
if decOff > actualSize || decOff+decLength > actualSize {
return nil, 0, 0, errInvalidRange
}
}
fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
cFns = append(cleanUpFns, cFns...)
if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {
cFns[i]()
}
return nil, PreConditionFailed{}
}
if isEncrypted {
copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
// Attach decrypter on inputReader
inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, opts.PartNumber, oi, copySource)
if err != nil {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {
cFns[i]()
}
return nil, err
}
}
// Decompression reader.
s2Reader := s2.NewReader(inputReader)
// Apply the skipLen and limit on the decompressed stream.
err = s2Reader.Skip(decOff)
if err != nil {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {
cFns[i]()
}
return nil, err
}
decReader := io.LimitReader(s2Reader, decLength)
if decLength > compReadAheadSize {
rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize)
if err == nil {
decReader = rah
cFns = append(cFns, func() {
rah.Close()
})
}
}
oi.Size = decLength
// Assemble the GetObjectReader
r = &GetObjectReader{
ObjInfo: oi,
pReader: decReader,
cleanUpFns: cFns,
opts: opts,
}
return r, nil
}
case isEncrypted:
var seqNumber uint32
var partStart int
@@ -635,8 +723,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
cFns = append(cleanUpFns, cFns...)
// Attach decrypter on inputReader
var decReader io.Reader
decReader, err = DecryptBlocksRequestR(inputReader, h,
off, length, seqNumber, partStart, oi, copySource)
decReader, err = DecryptBlocksRequestR(inputReader, h, seqNumber, partStart, oi, copySource)
if err != nil {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {
@@ -659,76 +746,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
// decrypted stream
decReader = io.LimitReader(ioutil.NewSkipReader(decReader, skipLen), decRangeLength)
// Assemble the GetObjectReader
r = &GetObjectReader{
ObjInfo: oi,
pReader: decReader,
cleanUpFns: cFns,
opts: opts,
}
return r, nil
}
case isCompressed:
// Read the decompressed size from the meta.json.
actualSize, err := oi.GetActualSize()
if err != nil {
return nil, 0, 0, err
}
off, length = int64(0), oi.Size
decOff, decLength := int64(0), actualSize
if rs != nil {
off, length, err = rs.GetOffsetLength(actualSize)
if err != nil {
return nil, 0, 0, err
}
// In case of range based queries on multiparts, the offset and length are reduced.
off, decOff = getCompressedOffsets(oi, off)
decLength = length
length = oi.Size - off
// For negative length we read everything.
if decLength < 0 {
decLength = actualSize - decOff
}
// Reply back invalid range if the input offset and length fall out of range.
if decOff > actualSize || decOff+decLength > actualSize {
return nil, 0, 0, errInvalidRange
}
}
fn = func(inputReader io.Reader, _ http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
cFns = append(cleanUpFns, cFns...)
if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {
cFns[i]()
}
return nil, PreConditionFailed{}
}
// Decompression reader.
s2Reader := s2.NewReader(inputReader)
// Apply the skipLen and limit on the decompressed stream.
err = s2Reader.Skip(decOff)
if err != nil {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {
cFns[i]()
}
return nil, err
}
decReader := io.LimitReader(s2Reader, decLength)
if decLength > compReadAheadSize {
rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize)
if err == nil {
decReader = rah
cFns = append(cFns, func() {
rah.Close()
})
}
}
oi.Size = decLength
// Assemble the GetObjectReader
r = &GetObjectReader{
ObjInfo: oi,