From 189b6682d6ef1447135bc625fb2da89ba02ab340 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Tue, 5 Sep 2017 16:56:23 -0700 Subject: [PATCH] azure: allow parts > 100MiB size to work properly (#4869) Previously if any multipart part size > 100MiB is uploaded, azure gateway returns error. This patch fixes the issue by creating sub parts sizing each 100MiB of given multipart part. On complete multipart, it fetches all uploaded azure block ids for each parts and performs completion. Fixes #4868 --- cmd/gateway-azure.go | 121 ++++++++++++++++++++++++++++---------- cmd/gateway-azure_test.go | 32 ++++++---- 2 files changed, 110 insertions(+), 43 deletions(-) diff --git a/cmd/gateway-azure.go b/cmd/gateway-azure.go index 3954703da..6f4812062 100644 --- a/cmd/gateway-azure.go +++ b/cmd/gateway-azure.go @@ -25,17 +25,18 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "sync" "time" "github.com/Azure/azure-sdk-for-go/storage" + humanize "github.com/dustin/go-humanize" "github.com/minio/minio-go/pkg/policy" "github.com/minio/sha256-simd" ) const globalAzureAPIVersion = "2016-05-31" +const azureBlockSize = 100 * humanize.MiByte // Canonicalize the metadata headers, without this azure-sdk calculates // incorrect signature. This attempt to canonicalize is to convert @@ -496,27 +497,23 @@ func (a *azureObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObje return info, traceError(NotImplemented{}) } -// Encode partID+md5Hex to a blockID. -func azureGetBlockID(partID int, md5Hex string) string { - return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%.5d.%s", partID, md5Hex))) +// Encode partID, subPartNumber and md5Hex to blockID. +func azureGetBlockID(partID, subPartNumber int, md5Hex string) string { + return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s", partID, subPartNumber, md5Hex))) } -// Decode blockID to partID+md5Hex. -func azureParseBlockID(blockID string) (int, string, error) { - idByte, err := base64.StdEncoding.DecodeString(blockID) - if err != nil { - return 0, "", traceError(err) +// Parse blockID into partID, subPartNumber and md5Hex. +func azureParseBlockID(blockID string) (partID, subPartNumber int, md5Hex string, err error) { + var blockIDBytes []byte + if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil { + return } - idStr := string(idByte) - splitRes := strings.Split(idStr, ".") - if len(splitRes) != 2 { - return 0, "", traceError(errUnexpected) + + if _, err = fmt.Sscanf(string(blockIDBytes), "%05d.%02d.%s", &partID, &subPartNumber, &md5Hex); err != nil { + err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes)) } - partID, err := strconv.Atoi(splitRes[0]) - if err != nil { - return 0, "", traceError(err) - } - return partID, splitRes[1], nil + + return } // PutObjectPart - Use Azure equivalent PutBlockWithLength. @@ -550,10 +547,25 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int teeReader = io.TeeReader(data, io.MultiWriter(writers...)) } - id := azureGetBlockID(partID, etag) - err = a.client.PutBlockWithLength(bucket, object, id, uint64(size), teeReader, nil) - if err != nil { - return info, azureToObjectError(traceError(err), bucket, object) + subPartSize := int64(azureBlockSize) + subPartNumber := 1 + for remainingSize := size; remainingSize >= 0; remainingSize -= subPartSize { + // Allow to create zero sized part. + if remainingSize == 0 && subPartNumber > 1 { + break + } + + if remainingSize < subPartSize { + subPartSize = remainingSize + } + + id := azureGetBlockID(partID, subPartNumber, etag) + err = a.client.PutBlockWithLength(bucket, object, id, uint64(subPartSize), io.LimitReader(teeReader, subPartSize), nil) + if err != nil { + return info, azureToObjectError(traceError(err), bucket, object) + } + + subPartNumber++ } if md5Hex != "" { @@ -601,7 +613,7 @@ func (a *azureObjects) ListObjectParts(bucket, object, uploadID string, partNumb break } partCount++ - partID, md5Hex, err := azureParseBlockID(part.Name) + partID, _, md5Hex, err := azureParseBlockID(part.Name) if err != nil { return result, err } @@ -639,14 +651,63 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string, if meta == nil { return objInfo, traceError(InvalidUploadID{uploadID}) } - var blocks []storage.Block - for _, part := range uploadedParts { - blocks = append(blocks, storage.Block{ - ID: azureGetBlockID(part.PartNumber, part.ETag), - Status: storage.BlockStatusUncommitted, - }) + + resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted) + if err != nil { + return objInfo, azureToObjectError(traceError(err), bucket, object) } - err = a.client.PutBlockList(bucket, object, blocks) + + getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) { + for _, part := range resp.UncommittedBlocks { + var partID int + var md5Hex string + if partID, _, md5Hex, err = azureParseBlockID(part.Name); err != nil { + return nil, 0, err + } + + if partNumber == partID && etag == md5Hex { + blocks = append(blocks, storage.Block{ + ID: part.Name, + Status: storage.BlockStatusUncommitted, + }) + + size += part.Size + } + } + + if len(blocks) == 0 { + return nil, 0, InvalidPart{} + } + + return blocks, size, nil + } + + var allBlocks []storage.Block + partSizes := make([]int64, len(uploadedParts)) + for i, part := range uploadedParts { + var blocks []storage.Block + var size int64 + blocks, size, err = getBlocks(part.PartNumber, part.ETag) + if err != nil { + return objInfo, traceError(err) + } + + allBlocks = append(allBlocks, blocks...) + partSizes[i] = size + } + + // Error out if parts except last part sizing < 5MiB. + for i, size := range partSizes[:len(partSizes)-1] { + if size < globalMinPartSize { + return objInfo, traceError(PartTooSmall{ + PartNumber: uploadedParts[i].PartNumber, + PartSize: size, + PartETag: uploadedParts[i].ETag, + }) + } + } + + err = a.client.PutBlockList(bucket, object, allBlocks) if err != nil { return objInfo, azureToObjectError(traceError(err), bucket, object) } diff --git a/cmd/gateway-azure_test.go b/cmd/gateway-azure_test.go index 29b70889d..65008e88e 100644 --- a/cmd/gateway-azure_test.go +++ b/cmd/gateway-azure_test.go @@ -133,15 +133,16 @@ func TestAzureToObjectError(t *testing.T) { // Test azureGetBlockID(). func TestAzureGetBlockID(t *testing.T) { testCases := []struct { - partID int - md5 string - blockID string + partID int + subPartNumber int + md5 string + blockID string }{ - {1, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="}, - {2, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="}, + {1, 7, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="}, + {2, 19, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="}, } for _, test := range testCases { - blockID := azureGetBlockID(test.partID, test.md5) + blockID := azureGetBlockID(test.partID, test.subPartNumber, test.md5) if blockID != test.blockID { t.Fatalf("%s is not equal to %s", blockID, test.blockID) } @@ -151,26 +152,31 @@ func TestAzureGetBlockID(t *testing.T) { // Test azureParseBlockID(). func TestAzureParseBlockID(t *testing.T) { testCases := []struct { - partID int - md5 string - blockID string + blockID string + partID int + subPartNumber int + md5 string }{ - {1, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="}, - {2, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="}, + {"MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U=", 1, 7, "d41d8cd98f00b204e9800998ecf8427e"}, + {"MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM=", 2, 19, "a7fb6b7b36ee4ed66b5546fac4690273"}, } for _, test := range testCases { - partID, md5, err := azureParseBlockID(test.blockID) + partID, subPartNumber, md5, err := azureParseBlockID(test.blockID) if err != nil { t.Fatal(err) } if partID != test.partID { t.Fatalf("%d not equal to %d", partID, test.partID) } + if subPartNumber != test.subPartNumber { + t.Fatalf("%d not equal to %d", subPartNumber, test.subPartNumber) + } if md5 != test.md5 { t.Fatalf("%s not equal to %s", md5, test.md5) } } - _, _, err := azureParseBlockID("junk") + + _, _, _, err := azureParseBlockID("junk") if err == nil { t.Fatal("Expected azureParseBlockID() to return error") }