api: CopyObjectPart was copying wrong offsets due to shadowing. (#3838)

startOffset was re-assigned to '0' so it would end up
copying wrong content ignoring the requested startOffset.

This also fixes the corruption issue we observed while
using docker registry.

Fixes https://github.com/docker/distribution/issues/2205

Also fixes #3842 - incorrect routing.
This commit is contained in:
Harshavardhana
2017-03-03 16:32:04 -08:00
committed by GitHub
parent 0c8c463a63
commit 05e53f1b34
9 changed files with 360 additions and 22 deletions

View File

@@ -26,6 +26,7 @@ import (
"net/http/httptest"
"net/url"
"strconv"
"strings"
"sync"
"testing"
@@ -939,6 +940,124 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a
}
// Tests sanity of attempting to copying each parts at offsets from an existing
// file and create a new object. Also validates if the written is same as what we
// expected.
func TestAPICopyObjectPartHandlerSanity(t *testing.T) {
defer DetectTestLeak(t)()
ExecObjectLayerAPITest(t, testAPICopyObjectPartHandlerSanity, []string{"CopyObjectPart"})
}
func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
credentials credential, t *testing.T) {
objectName := "test-object"
// register event notifier.
err := initEventNotifier(obj)
if err != nil {
t.Fatalf("Initializing event notifiers failed")
}
// set of byte data for PutObject.
// object has to be created before running tests for Copy Object.
// this is required even to assert the copied object,
bytesData := []struct {
byteData []byte
}{
{generateBytesData(6 * humanize.MiByte)},
}
// set of inputs for uploading the objects before tests for downloading is done.
putObjectInputs := []struct {
bucketName string
objectName string
contentLength int64
textData []byte
metaData map[string]string
}{
// case - 1.
{bucketName, objectName, int64(len(bytesData[0].byteData)), bytesData[0].byteData, make(map[string]string)},
}
sha256sum := ""
// iterate through the above set of inputs and upload the object.
for i, input := range putObjectInputs {
// uploading the object.
_, err = obj.PutObject(input.bucketName, input.objectName, input.contentLength,
bytes.NewBuffer(input.textData), input.metaData, sha256sum)
// if object upload fails stop the test.
if err != nil {
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
}
}
// Initiate Multipart upload for testing PutObjectPartHandler.
testObject := "testobject"
// PutObjectPart API HTTP Handler has to be tested in isolation,
// that is without any other handler being registered,
// That's why NewMultipartUpload is initiated using ObjectLayer.
uploadID, err := obj.NewMultipartUpload(bucketName, testObject, nil)
if err != nil {
// Failed to create NewMultipartUpload, abort.
t.Fatalf("Minio %s : <ERROR> %s", instanceType, err)
}
a := 0
b := globalMinPartSize - 1
var parts []completePart
for partNumber := 1; partNumber <= 2; partNumber++ {
// initialize HTTP NewRecorder, this records any mutations to response writer inside the handler.
rec := httptest.NewRecorder()
cpPartURL := getCopyObjectPartURL("", bucketName, testObject, uploadID, fmt.Sprintf("%d", partNumber))
// construct HTTP request for copy object.
var req *http.Request
req, err = newTestSignedRequestV4("PUT", cpPartURL, 0, nil, credentials.AccessKey, credentials.SecretKey)
if err != nil {
t.Fatalf("Test failed to create HTTP request for copy object part: <ERROR> %v", err)
}
// "X-Amz-Copy-Source" header contains the information about the source bucket and the object to copied.
req.Header.Set("X-Amz-Copy-Source", url.QueryEscape(pathJoin(bucketName, objectName)))
req.Header.Set("X-Amz-Copy-Source-Range", fmt.Sprintf("bytes=%d-%d", a, b))
// Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler.
// Call the ServeHTTP to execute the handler, `func (api objectAPIHandlers) CopyObjectHandler` handles the request.
a = globalMinPartSize
b = len(bytesData[0].byteData) - 1
apiRouter.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("Test failed to create HTTP request for copy %d", rec.Code)
}
resp := &CopyObjectPartResponse{}
if err = xmlDecoder(rec.Body, resp, rec.Result().ContentLength); err != nil {
t.Fatalf("Test failed to decode XML response: <ERROR> %v", err)
}
parts = append(parts, completePart{
PartNumber: partNumber,
ETag: strings.Trim(resp.ETag, "\""),
})
}
result, err := obj.CompleteMultipartUpload(bucketName, testObject, uploadID, parts)
if err != nil {
t.Fatalf("Test: %s complete multipart upload failed: <ERROR> %v", instanceType, err)
}
if result.Size != int64(len(bytesData[0].byteData)) {
t.Fatalf("Test: %s expected size not written: expected %d, got %d", instanceType, len(bytesData[0].byteData), result.Size)
}
var buf bytes.Buffer
if err = obj.GetObject(bucketName, testObject, 0, int64(len(bytesData[0].byteData)), &buf); err != nil {
t.Fatalf("Test: %s reading completed file failed: <ERROR> %v", instanceType, err)
}
if !bytes.Equal(buf.Bytes(), bytesData[0].byteData) {
t.Fatalf("Test: %s returned data is not expected corruption detected:", instanceType)
}
}
// Wrapper for calling Copy Object Part API handler tests for both XL multiple disks and single node setup.
func TestAPICopyObjectPartHandler(t *testing.T) {
defer DetectTestLeak(t)()
@@ -1069,10 +1188,23 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
accessKey: credentials.AccessKey,
secretKey: credentials.SecretKey,
expectedRespStatus: http.StatusRequestedRangeNotSatisfiable,
expectedRespStatus: http.StatusBadRequest,
},
// Test case - 6.
// Test case with ivalid byte range for exceeding source size boundaries.
{
bucketName: bucketName,
uploadID: uploadID,
copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName),
copySourceRange: "bytes=0-6144",
accessKey: credentials.AccessKey,
secretKey: credentials.SecretKey,
expectedRespStatus: http.StatusBadRequest,
},
// Test case - 7.
// Test case with object name missing from source.
// fail with BadRequest.
{
@@ -1085,7 +1217,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
expectedRespStatus: http.StatusBadRequest,
},
// Test case - 7.
// Test case - 8.
// Test case with non-existent source file.
// Case for the purpose of failing `api.ObjectAPI.GetObjectInfo`.
// Expecting the response status code to http.StatusNotFound (404).
@@ -1099,7 +1231,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
expectedRespStatus: http.StatusNotFound,
},
// Test case - 8.
// Test case - 9.
// Test case with non-existent source file.
// Case for the purpose of failing `api.ObjectAPI.PutObjectPart`.
// Expecting the response status code to http.StatusNotFound (404).
@@ -1113,7 +1245,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
expectedRespStatus: http.StatusNotFound,
},
// Test case - 9.
// Test case - 10.
// Case with invalid AccessKey.
{
bucketName: bucketName,
@@ -1125,7 +1257,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
expectedRespStatus: http.StatusForbidden,
},
// Test case - 10.
// Test case - 11.
// Case with non-existent upload id.
{
bucketName: bucketName,
@@ -1136,7 +1268,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
expectedRespStatus: http.StatusNotFound,
},
// Test case - 11.
// Test case - 12.
// invalid part number.
{
bucketName: bucketName,
@@ -1147,7 +1279,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri
secretKey: credentials.SecretKey,
expectedRespStatus: http.StatusOK,
},
// Test case - 12.
// Test case - 13.
// maximum part number.
{
bucketName: bucketName,