azure gateway: return MD5Sum as ETag for S3 API compatibility (#6884)

Fixes #6872.

This PR refactors multipart upload implementation to use a per
part metadata file which is cleaned up at the end of the upload
This commit is contained in:
poornas 2019-02-06 16:58:43 -08:00 committed by kannappanr
parent 4aa9ee153b
commit d203e7e1cc
2 changed files with 175 additions and 215 deletions

View File

@ -26,6 +26,7 @@ import (
"fmt"
"io"
"net/http"
"path"
"sort"
"strconv"
"strings"
@ -52,6 +53,8 @@ const (
metadataObjectNameTemplate = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x/azure.json"
azureBackend = "azure"
azureMarkerPrefix = "{minio}"
metadataPartNamePrefix = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x"
maxPartsCount = 10000
)
func init() {
@ -261,6 +264,26 @@ func s3MetaToAzureProperties(ctx context.Context, s3Metadata map[string]string)
return blobMeta, props, nil
}
const (
partMetaVersionV1 = "1"
)
// partMetadataV1 struct holds the part specific metadata for
// multipart operations.
type partMetadataV1 struct {
Version string `json:"version"`
Size int64 `json:"Size"`
BlockIDs []string `json:"blockIDs"`
ETag string `json:"etag"`
}
// Returns the initialized part metadata struct
func newPartMetaV1(uploadID string, partID int) (partMeta *partMetadataV1) {
p := &partMetadataV1{}
p.Version = partMetaVersionV1
return p
}
// azurePropertiesToS3Meta converts Azure metadata/properties to S3
// metadata. It is the reverse of s3MetaToAzureProperties. Azure's
// `.GetMetadata()` lower-cases all header keys, so this is taken into
@ -405,37 +428,13 @@ func checkAzureUploadID(ctx context.Context, uploadID string) (err error) {
return nil
}
// Encode partID, subPartNumber, uploadID and md5Hex to blockID.
func azureGetBlockID(partID, subPartNumber int, uploadID, md5Hex string) string {
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s.%s", partID, subPartNumber, uploadID, md5Hex)))
}
// Parse blockID into partID, subPartNumber and md5Hex.
func azureParseBlockID(blockID string) (partID, subPartNumber int, uploadID, md5Hex string, err error) {
var blockIDBytes []byte
if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil {
// parses partID from part metadata file name
func parseAzurePart(metaPartFileName, prefix string) (partID int, err error) {
partStr := strings.TrimPrefix(metaPartFileName, prefix+"/")
if partID, err = strconv.Atoi(partStr); err != nil || partID <= 0 {
err = fmt.Errorf("invalid part number in block id '%s'", string(partID))
return
}
tokens := strings.Split(string(blockIDBytes), ".")
if len(tokens) != 4 {
err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes))
return
}
if partID, err = strconv.Atoi(tokens[0]); err != nil || partID <= 0 {
err = fmt.Errorf("invalid part number in block id '%s'", string(blockIDBytes))
return
}
if subPartNumber, err = strconv.Atoi(tokens[1]); err != nil || subPartNumber <= 0 {
err = fmt.Errorf("invalid sub-part number in block id '%s'", string(blockIDBytes))
return
}
uploadID = tokens[2]
md5Hex = tokens[3]
return
}
@ -747,14 +746,7 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
return a.GetObjectInfo(ctx, bucket, object, opts)
}
uuid, err := getAzureUploadID()
if err != nil {
return objInfo, err
}
etag := data.MD5HexString()
if etag == "" {
etag = minio.GenETag()
}
blockIDs := make(map[string]string)
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
subPartSize, subPartNumber := int64(azureBlockSize), 1
@ -768,7 +760,8 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
subPartSize = remainingSize
}
id := azureGetBlockID(1, subPartNumber, uuid, etag)
id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID()))
blockIDs[id] = ""
if err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil); err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
@ -780,17 +773,9 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
if err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, aerr error) {
getBlocks := func(blocksMap map[string]string) (blocks []storage.Block, size int64, aerr error) {
for _, part := range resp.UncommittedBlocks {
var partID int
var readUploadID string
var md5Hex string
if partID, _, readUploadID, md5Hex, aerr = azureParseBlockID(part.Name); aerr != nil {
return nil, 0, aerr
}
if partNumber == partID && uuid == readUploadID && etag == md5Hex {
if _, ok := blocksMap[part.Name]; ok {
blocks = append(blocks, storage.Block{
ID: part.Name,
Status: storage.BlockStatusUncommitted,
@ -808,7 +793,7 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
}
var blocks []storage.Block
blocks, _, err = getBlocks(1, etag)
blocks, _, err = getBlocks(blockIDs)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, err
@ -823,7 +808,7 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
}
// Save md5sum for future processing on the object.
metadata["x-amz-meta-md5sum"] = hex.EncodeToString(data.MD5Current())
metadata["x-amz-meta-md5sum"] = r.MD5CurrentHexString()
objBlob.Metadata, objBlob.Properties, err = s3MetaToAzureProperties(ctx, metadata)
if err != nil {
return objInfo, azureToObjectError(err, bucket, object)
@ -896,6 +881,17 @@ func getAzureMetadataObjectName(objectName, uploadID string) string {
return fmt.Sprintf(metadataObjectNameTemplate, uploadID, sha256.Sum256([]byte(objectName)))
}
// gets the name of part metadata file for multipart upload operations
func getAzureMetadataPartName(objectName, uploadID string, partID int) string {
partMetaPrefix := getAzureMetadataPartPrefix(uploadID, objectName)
return path.Join(partMetaPrefix, fmt.Sprintf("%d", partID))
}
// gets the prefix of part metadata file
func getAzureMetadataPartPrefix(uploadID, objectName string) string {
return fmt.Sprintf(metadataPartNamePrefix, uploadID, sha256.Sum256([]byte(objectName)))
}
func (a *azureObjects) checkUploadIDExists(ctx context.Context, bucketName, objectName, uploadID string) (err error) {
blob := a.client.GetContainerReference(bucketName).GetBlobReference(
getAzureMetadataObjectName(objectName, uploadID))
@ -948,11 +944,7 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload
return info, err
}
etag := data.MD5HexString()
if etag == "" {
etag = minio.GenETag()
}
partMetaV1 := newPartMetaV1(uploadID, partID)
subPartSize, subPartNumber := int64(azureBlockSize), 1
for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize {
// Allow to create zero sized part.
@ -964,7 +956,9 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload
subPartSize = remainingSize
}
id := azureGetBlockID(partID, subPartNumber, uploadID, etag)
id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID()))
partMetaV1.BlockIDs = append(partMetaV1.BlockIDs, id)
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil)
if err != nil {
@ -973,8 +967,26 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload
subPartNumber++
}
partMetaV1.ETag = r.MD5CurrentHexString()
partMetaV1.Size = data.Size()
// maintain per part md5sum in a temporary part metadata file until upload
// is finalized.
metadataObject := getAzureMetadataPartName(object, uploadID, partID)
var jsonData []byte
if jsonData, err = json.Marshal(partMetaV1); err != nil {
logger.LogIf(ctx, err)
return info, err
}
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject)
err = blob.CreateBlockBlobFromReader(bytes.NewBuffer(jsonData), nil)
if err != nil {
return info, azureToObjectError(err, bucket, metadataObject)
}
info.PartNumber = partID
info.ETag = etag
info.ETag = partMetaV1.ETag
info.LastModified = minio.UTCNow()
info.Size = data.Size()
return info, nil
@ -991,48 +1003,57 @@ func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uplo
result.UploadID = uploadID
result.MaxParts = maxParts
objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object)
resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil)
azureErr, ok := err.(storage.AzureStorageServiceError)
if ok && azureErr.StatusCode == http.StatusNotFound {
// If no parts are uploaded yet then we return empty list.
return result, nil
}
if err != nil {
return result, azureToObjectError(err, bucket, object)
}
// Build a sorted list of parts and return the requested entries.
partsMap := make(map[int]minio.PartInfo)
for _, block := range resp.UncommittedBlocks {
var partNumber int
var parsedUploadID string
var md5Hex string
if partNumber, _, parsedUploadID, md5Hex, err = azureParseBlockID(block.Name); err != nil {
return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object)
}
if parsedUploadID != uploadID {
continue
}
part, ok := partsMap[partNumber]
if !ok {
partsMap[partNumber] = minio.PartInfo{
PartNumber: partNumber,
Size: block.Size,
ETag: md5Hex,
}
continue
}
if part.ETag != md5Hex {
// If two parts of same partNumber were uploaded with different contents
// return error as we won't be able to decide which the latest part is.
return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object)
}
part.Size += block.Size
partsMap[partNumber] = part
}
var parts []minio.PartInfo
for _, part := range partsMap {
parts = append(parts, part)
var marker, delimiter string
maxKeys := maxPartsCount
if partNumberMarker == 0 {
maxKeys = maxParts
}
prefix := getAzureMetadataPartPrefix(uploadID, object)
container := a.client.GetContainerReference(bucket)
resp, err := container.ListBlobs(storage.ListBlobsParameters{
Prefix: prefix,
Marker: marker,
Delimiter: delimiter,
MaxResults: uint(maxKeys),
})
if err != nil {
return result, azureToObjectError(err, bucket, prefix)
}
for _, blob := range resp.Blobs {
if delimiter == "" && !strings.HasPrefix(blob.Name, minio.GatewayMinioSysTmp) {
// We filter out non minio.GatewayMinioSysTmp entries in the recursive listing.
continue
}
// filter temporary metadata file for blob
if strings.HasSuffix(blob.Name, "azure.json") {
continue
}
if !isAzureMarker(marker) && blob.Name <= marker {
// If the application used ListObjectsV1 style marker then we
// skip all the entries till we reach the marker.
continue
}
partNumber, err := parseAzurePart(blob.Name, prefix)
if err != nil {
return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object)
}
var metadata partMetadataV1
var metadataReader io.Reader
blob := a.client.GetContainerReference(bucket).GetBlobReference(blob.Name)
if metadataReader, err = blob.Get(nil); err != nil {
return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object)
}
if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
logger.LogIf(ctx, err)
return result, azureToObjectError(err, bucket, object)
}
parts = append(parts, minio.PartInfo{
PartNumber: partNumber,
Size: metadata.Size,
ETag: metadata.ETag,
})
}
sort.Slice(parts, func(i int, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
@ -1071,6 +1092,22 @@ func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object,
if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return err
}
var partNumberMarker int
for {
lpi, err := a.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxPartsCount, minio.ObjectOptions{})
if err != nil {
break
}
for _, part := range lpi.Parts {
pblob := a.client.GetContainerReference(bucket).GetBlobReference(
getAzureMetadataPartName(object, uploadID, part.PartNumber))
pblob.Delete(nil)
}
partNumberMarker = lpi.NextPartNumberMarker
if !lpi.IsTruncated {
break
}
}
blob := a.client.GetContainerReference(bucket).GetBlobReference(
getAzureMetadataObjectName(object, uploadID))
@ -1100,66 +1137,33 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
return objInfo, azureToObjectError(err, bucket, metadataObject)
}
defer func() {
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject)
derr := blob.Delete(nil)
logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID)
logger.LogIf(ctx, derr)
}()
objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object)
resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil)
if err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) {
for _, part := range resp.UncommittedBlocks {
var partID int
var readUploadID string
var md5Hex string
if partID, _, readUploadID, md5Hex, err = azureParseBlockID(part.Name); err != nil {
return nil, 0, err
}
if partNumber == partID && uploadID == readUploadID && etag == md5Hex {
blocks = append(blocks, storage.Block{
ID: part.Name,
Status: storage.BlockStatusUncommitted,
})
size += part.Size
}
}
if len(blocks) == 0 {
return nil, 0, minio.InvalidPart{}
}
return blocks, size, nil
}
var allBlocks []storage.Block
partSizes := make([]int64, len(uploadedParts))
for i, part := range uploadedParts {
var blocks []storage.Block
var size int64
blocks, size, err = getBlocks(part.PartNumber, part.ETag)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, err
var partMetadataReader io.Reader
var partMetadata partMetadataV1
partMetadataObject := getAzureMetadataPartName(object, uploadID, part.PartNumber)
pblob := a.client.GetContainerReference(bucket).GetBlobReference(partMetadataObject)
if partMetadataReader, err = pblob.Get(nil); err != nil {
return objInfo, azureToObjectError(err, bucket, partMetadataObject)
}
allBlocks = append(allBlocks, blocks...)
partSizes[i] = size
}
if err = json.NewDecoder(partMetadataReader).Decode(&partMetadata); err != nil {
logger.LogIf(ctx, err)
return objInfo, azureToObjectError(err, bucket, partMetadataObject)
}
// Error out if parts except last part sizing < 5MiB.
for i, size := range partSizes[:len(partSizes)-1] {
if size < azureS3MinPartSize {
if partMetadata.ETag != part.ETag {
return objInfo, minio.InvalidPart{}
}
for _, blockID := range partMetadata.BlockIDs {
allBlocks = append(allBlocks, storage.Block{ID: blockID, Status: storage.BlockStatusUncommitted})
}
if i < (len(uploadedParts)-1) && partMetadata.Size < azureS3MinPartSize {
return objInfo, minio.PartTooSmall{
PartNumber: uploadedParts[i].PartNumber,
PartSize: size,
PartSize: partMetadata.Size,
PartETag: uploadedParts[i].ETag,
}
}
@ -1183,6 +1187,28 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
return objInfo, azureToObjectError(err, bucket, object)
}
}
var partNumberMarker int
for {
lpi, err := a.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxPartsCount, minio.ObjectOptions{})
if err != nil {
break
}
for _, part := range lpi.Parts {
pblob := a.client.GetContainerReference(bucket).GetBlobReference(
getAzureMetadataPartName(object, uploadID, part.PartNumber))
pblob.Delete(nil)
}
partNumberMarker = lpi.NextPartNumberMarker
if !lpi.IsTruncated {
break
}
}
blob = a.client.GetContainerReference(bucket).GetBlobReference(metadataObject)
derr := blob.Delete(nil)
logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID)
logger.LogIf(ctx, derr)
return a.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
}

View File

@ -188,72 +188,6 @@ func TestAzureToObjectError(t *testing.T) {
}
}
// Test azureGetBlockID().
func TestAzureGetBlockID(t *testing.T) {
testCases := []struct {
partID int
subPartNumber int
uploadID string
md5 string
blockID string
}{
{1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ=="},
{2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw=="},
}
for _, test := range testCases {
blockID := azureGetBlockID(test.partID, test.subPartNumber, test.uploadID, test.md5)
if blockID != test.blockID {
t.Fatalf("%s is not equal to %s", blockID, test.blockID)
}
}
}
// Test azureParseBlockID().
func TestAzureParseBlockID(t *testing.T) {
testCases := []struct {
blockID string
partID int
subPartNumber int
uploadID string
md5 string
success bool
}{
// Invalid base64.
{"MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ=", 0, 0, "", "", false},
// Invalid number of tokens.
{"MDAwMDEuQUEuZjMyOGMzNWNhZDkzODEzNwo=", 0, 0, "", "", false},
// Invalid encoded part ID.
{"MDAwMGEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQo=", 0, 0, "", "", false},
// Invalid sub part ID.
{"MDAwMDEuQUEuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQo=", 0, 0, "", "", false},
{"MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ==", 1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e", true},
{"MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw==", 2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273", true},
}
for i, test := range testCases {
partID, subPartNumber, uploadID, md5, err := azureParseBlockID(test.blockID)
if err != nil && test.success {
t.Errorf("Test %d: Expected success but failed %s", i+1, err)
}
if err == nil && !test.success {
t.Errorf("Test %d: Expected to fail but succeeeded insteadl", i+1)
}
if err == nil {
if partID != test.partID {
t.Errorf("Test %d: %d not equal to %d", i+1, partID, test.partID)
}
if subPartNumber != test.subPartNumber {
t.Errorf("Test %d: %d not equal to %d", i+1, subPartNumber, test.subPartNumber)
}
if uploadID != test.uploadID {
t.Errorf("Test %d: %s not equal to %s", i+1, uploadID, test.uploadID)
}
if md5 != test.md5 {
t.Errorf("Test %d: %s not equal to %s", i+1, md5, test.md5)
}
}
}
}
func TestAnonErrToObjectErr(t *testing.T) {
testCases := []struct {
name string