Enable compression with encryption in CopyObject API (#20411)

When the encryption and compression are both enabled, the
the server will avoid compressing the data for no apparent reason

This commit will enable it and update unit tests.
This commit is contained in:
Anis Eleuch 2024-09-12 21:10:44 +01:00 committed by GitHub
parent 5862582cd7
commit 398ffb1136
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 25 deletions

View File

@ -1333,16 +1333,12 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
}
// Check if either the source is encrypted or the destination will be encrypted.
objectEncryption := crypto.Requested(r.Header)
objectEncryption = objectEncryption || crypto.IsSourceEncrypted(srcInfo.UserDefined)
var compressMetadata map[string]string
// No need to compress for remote etcd calls
// Pass the decompressed stream to such calls.
isDstCompressed := isCompressible(r.Header, dstObject) &&
length > minCompressibleSize &&
!isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) && !objectEncryption
!isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI)
if isDstCompressed {
compressMetadata = make(map[string]string, 2)
// Preserving the compression metadata.
@ -1625,7 +1621,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// if encryption is enabled we do not need explicit "REPLACE" metadata to
// be enabled as well - this is to allow for key-rotation.
if !isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) && !isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) &&
srcInfo.metadataOnly && srcOpts.VersionID == "" && !objectEncryption {
srcInfo.metadataOnly && srcOpts.VersionID == "" &&
!crypto.Requested(r.Header) &&
!crypto.IsSourceEncrypted(srcInfo.UserDefined) {
// If x-amz-metadata-directive is not set to REPLACE then we need
// to error out if source and destination are same.
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopyDest), r.URL)

View File

@ -903,9 +903,9 @@ func testAPIGetObjectWithPartNumberHandler(obj ObjectLayer, instanceType, bucket
// SSEC can't be used with compression
globalCompressConfigMu.Lock()
globalCompressEnabled := globalCompressConfig.Enabled
compressEnabled := globalCompressConfig.Enabled
globalCompressConfigMu.Unlock()
if globalCompressEnabled {
if compressEnabled {
objectInputs = objectInputs[0:9]
}
@ -1680,7 +1680,7 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a
// expected.
func TestAPICopyObjectPartHandlerSanity(t *testing.T) {
defer DetectTestLeak(t)()
ExecExtendedObjectLayerAPITest(t, testAPICopyObjectPartHandlerSanity, []string{"CopyObjectPart"})
ExecExtendedObjectLayerAPITest(t, testAPICopyObjectPartHandlerSanity, []string{"NewMultipart", "CompleteMultipart", "CopyObjectPart"})
}
func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
@ -1688,7 +1688,6 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
) {
objectName := "test-object"
var err error
opts := ObjectOptions{}
// set of byte data for PutObject.
// object has to be created before running tests for Copy Object.
// this is required even to assert the copied object,
@ -1720,18 +1719,26 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
}
}
// Initiate Multipart upload for testing PutObjectPartHandler.
testObject := "testobject"
// PutObjectPart API HTTP Handler has to be tested in isolation,
// that is without any other handler being registered,
// That's why NewMultipartUpload is initiated using ObjectLayer.
res, err := obj.NewMultipartUpload(context.Background(), bucketName, testObject, opts)
// Initiate Multipart upload for testing CopyObjectPartHandler.
rec := httptest.NewRecorder()
req, err := newTestSignedRequestV4(http.MethodPost, getNewMultipartURL("", bucketName, testObject),
0, nil, credentials.AccessKey, credentials.SecretKey, nil)
if err != nil {
// Failed to create NewMultipartUpload, abort.
t.Fatalf("MinIO %s : <ERROR> %s", instanceType, err)
t.Fatalf("Failed to create HTTP request for NewMultipart Request: <ERROR> %v", err)
}
uploadID := res.UploadID
apiRouter.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("%s: Expected the response status to be `%d`, but instead found `%d`", instanceType, http.StatusOK, rec.Code)
}
decoder := xml.NewDecoder(rec.Body)
multipartResponse := &InitiateMultipartUploadResponse{}
err = decoder.Decode(multipartResponse)
if err != nil {
t.Fatalf("Error decoding the recorded response Body")
}
uploadID := multipartResponse.UploadID
a := 0
b := globalMinPartSize
@ -1772,18 +1779,34 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
})
}
result, err := obj.CompleteMultipartUpload(context.Background(), bucketName, testObject, uploadID, parts, ObjectOptions{})
if err != nil {
t.Fatalf("Test: %s complete multipart upload failed: <ERROR> %v", instanceType, err)
var completeBytes []byte
// Complete multipart upload parts.
completeUploads := &CompleteMultipartUpload{
Parts: parts,
}
if result.Size != int64(len(bytesData[0].byteData)) {
t.Fatalf("Test: %s expected size not written: expected %d, got %d", instanceType, len(bytesData[0].byteData), result.Size)
completeBytes, err = xml.Marshal(completeUploads)
if err != nil {
t.Fatalf("Error XML encoding of parts: <ERROR> %s.", err)
}
// Indicating that all parts are uploaded and initiating CompleteMultipartUpload.
req, err = newTestSignedRequestV4(http.MethodPost, getCompleteMultipartUploadURL("", bucketName, testObject, uploadID),
int64(len(completeBytes)), bytes.NewReader(completeBytes), credentials.AccessKey, credentials.SecretKey, nil)
if err != nil {
t.Fatalf("Failed to create HTTP request for CompleteMultipartUpload: <ERROR> %v", err)
}
rec = httptest.NewRecorder()
apiRouter.ServeHTTP(rec, req)
// Assert the response code with the expected status.
if rec.Code != http.StatusOK {
t.Errorf("Test %s: Expected the response status to be `%d`, but instead found `%d`", instanceType, http.StatusOK, rec.Code)
}
var buf bytes.Buffer
r, err := obj.GetObjectNInfo(context.Background(), bucketName, testObject, nil, nil, ObjectOptions{})
if err != nil {
t.Fatalf("Test: %s reading completed file failed: <ERROR> %v", instanceType, err)
t.Fatalf("Test %s: reading completed file failed: <ERROR> %v", instanceType, err)
}
if _, err = io.Copy(&buf, r); err != nil {
r.Close()
@ -1791,7 +1814,14 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
}
r.Close()
if !bytes.Equal(buf.Bytes(), bytesData[0].byteData) {
t.Fatalf("Test: %s returned data is not expected corruption detected:", instanceType)
t.Fatalf("Test %s: returned data is not expected corruption detected:", instanceType)
}
globalCompressConfigMu.Lock()
compressEnabled := globalCompressConfig.Enabled
globalCompressConfigMu.Unlock()
if compressEnabled && !r.ObjInfo.IsCompressed() {
t.Errorf("Test %s: object found to be uncompressed though compression was enabled", instanceType)
}
}
@ -2548,6 +2578,13 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
if !bytes.Equal(bytesData[0].byteData, buffers[0].Bytes()) {
t.Errorf("Test %d: %s: Data Mismatch: Data fetched back from the copied object doesn't match the original one.", i, instanceType)
}
globalCompressConfigMu.Lock()
compressEnabled := globalCompressConfig.Enabled
globalCompressConfigMu.Unlock()
if compressEnabled && !r.ObjInfo.IsCompressed() {
t.Errorf("Test %d %s: object found to be uncompressed though compression was enabled", i, instanceType)
}
}
// Verify response of the V2 signed HTTP request.