mirror of
https://github.com/minio/minio.git
synced 2025-02-04 10:26:01 -05:00
Copy and CopyPart changes for compression (#6669)
This PR fixes - The target object should be compressed even if the source object is not compressed. - The actual size for an encrypted object should be the `decryptedSize`
This commit is contained in:
parent
e29009d347
commit
ecb042aa1c
@ -763,9 +763,27 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
|||||||
|
|
||||||
var reader io.Reader
|
var reader io.Reader
|
||||||
var length = srcInfo.Size
|
var length = srcInfo.Size
|
||||||
|
|
||||||
|
// Set the actual size to the decrypted size if encrypted.
|
||||||
|
actualSize := srcInfo.Size
|
||||||
|
if crypto.IsEncrypted(srcInfo.UserDefined) {
|
||||||
|
actualSize, err = srcInfo.DecryptedSize()
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// No need to compress for remote etcd calls
|
// No need to compress for remote etcd calls
|
||||||
// Pass the decompressed stream to such calls.
|
// Pass the decompressed stream to such calls.
|
||||||
if srcInfo.IsCompressed() && !isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) {
|
isCompressed := objectAPI.IsCompressionSupported() && isCompressible(r.Header, srcObject) && !isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI)
|
||||||
|
if isCompressed {
|
||||||
|
// Storing the compression metadata.
|
||||||
|
srcInfo.UserDefined[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
|
||||||
|
srcInfo.UserDefined[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(actualSize, 10)
|
||||||
|
// Remove all source encrypted related metadata to
|
||||||
|
// avoid copying them in target object.
|
||||||
|
crypto.RemoveInternalEntries(srcInfo.UserDefined)
|
||||||
// Open a pipe for compression.
|
// Open a pipe for compression.
|
||||||
// Where pipeWriter is piped to srcInfo.Reader.
|
// Where pipeWriter is piped to srcInfo.Reader.
|
||||||
// gr writes to pipeWriter.
|
// gr writes to pipeWriter.
|
||||||
@ -788,26 +806,24 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
|||||||
reader = gr
|
reader = gr
|
||||||
}
|
}
|
||||||
|
|
||||||
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", srcInfo.Size)
|
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var encMetadata = make(map[string]string)
|
var encMetadata = make(map[string]string)
|
||||||
if objectAPI.IsEncryptionSupported() && !srcInfo.IsCompressed() {
|
if objectAPI.IsEncryptionSupported() && !isCompressed {
|
||||||
// Encryption parameters not applicable for this object.
|
// Encryption parameters not applicable for this object.
|
||||||
if !crypto.IsEncrypted(srcInfo.UserDefined) && crypto.SSECopy.IsRequested(r.Header) {
|
if !crypto.IsEncrypted(srcInfo.UserDefined) && crypto.SSECopy.IsRequested(r.Header) {
|
||||||
writeErrorResponse(w, toAPIErrorCode(errInvalidEncryptionParameters), r.URL)
|
writeErrorResponse(w, toAPIErrorCode(errInvalidEncryptionParameters), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encryption parameters not present for this object.
|
// Encryption parameters not present for this object.
|
||||||
if crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && !crypto.SSECopy.IsRequested(r.Header) {
|
if crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && !crypto.SSECopy.IsRequested(r.Header) {
|
||||||
writeErrorResponse(w, ErrInvalidSSECustomerAlgorithm, r.URL)
|
writeErrorResponse(w, ErrInvalidSSECustomerAlgorithm, r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var oldKey, newKey []byte
|
var oldKey, newKey []byte
|
||||||
sseCopyS3 := crypto.S3.IsEncrypted(srcInfo.UserDefined)
|
sseCopyS3 := crypto.S3.IsEncrypted(srcInfo.UserDefined)
|
||||||
sseCopyC := crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && crypto.SSECopy.IsRequested(r.Header)
|
sseCopyC := crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && crypto.SSECopy.IsRequested(r.Header)
|
||||||
@ -973,8 +989,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
|||||||
host, port = "", ""
|
host, port = "", ""
|
||||||
}
|
}
|
||||||
|
|
||||||
if srcInfo.IsCompressed() {
|
if objInfo.IsCompressed() {
|
||||||
objInfo.Size = srcInfo.GetActualSize()
|
objInfo.Size = actualSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify object created event.
|
// Notify object created event.
|
||||||
@ -1470,9 +1486,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
|||||||
|
|
||||||
// Get the object offset & length
|
// Get the object offset & length
|
||||||
startOffset, length, _ := rs.GetOffsetLength(actualPartSize)
|
startOffset, length, _ := rs.GetOffsetLength(actualPartSize)
|
||||||
if rangeHeader != "" {
|
actualPartSize = length
|
||||||
actualPartSize = length
|
|
||||||
}
|
|
||||||
|
|
||||||
/// maximum copy size for multipart objects in a single operation
|
/// maximum copy size for multipart objects in a single operation
|
||||||
if isMaxAllowedPartSize(length) {
|
if isMaxAllowedPartSize(length) {
|
||||||
@ -1483,8 +1497,17 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
|||||||
var reader io.Reader
|
var reader io.Reader
|
||||||
var getLength = length
|
var getLength = length
|
||||||
|
|
||||||
// Need to decompress only for range-enabled copy parts.
|
var li ListPartsInfo
|
||||||
if srcInfo.IsCompressed() && rangeHeader != "" {
|
li, err = objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Read compression metadata preserved in the init multipart for the decision.
|
||||||
|
_, compressPart := li.UserDefined[ReservedMetadataPrefix+"compression"]
|
||||||
|
isCompressed := compressPart
|
||||||
|
// Compress only if the compression is enabled during initial multipart.
|
||||||
|
if isCompressed {
|
||||||
// Open a pipe for compression.
|
// Open a pipe for compression.
|
||||||
// Where pipeWriter is piped to srcInfo.Reader.
|
// Where pipeWriter is piped to srcInfo.Reader.
|
||||||
// gr writes to pipeWriter.
|
// gr writes to pipeWriter.
|
||||||
@ -1510,13 +1533,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if objectAPI.IsEncryptionSupported() && !srcInfo.IsCompressed() {
|
if objectAPI.IsEncryptionSupported() && !isCompressed {
|
||||||
var li ListPartsInfo
|
|
||||||
li, err = objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if crypto.IsEncrypted(li.UserDefined) {
|
if crypto.IsEncrypted(li.UserDefined) {
|
||||||
if !hasServerSideEncryptionHeader(r.Header) {
|
if !hasServerSideEncryptionHeader(r.Header) {
|
||||||
writeErrorResponse(w, ErrSSEMultipartEncrypted, r.URL)
|
writeErrorResponse(w, ErrSSEMultipartEncrypted, r.URL)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user