mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
azure: Fix upload corruption with PutObject() on certain sizes (#8330)
On objects bigger than 100MiB can have a corrupted object stored due to partial blockListing attempted right after each blocks uploaded. Simplify this code to ensure that all the blocks successfully uploaded are committed right away. This PR also updates the azure-sdk-go to latest release.
This commit is contained in:
@@ -761,7 +761,7 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string,
|
||||
// uses Azure equivalent CreateBlockBlobFromReader.
|
||||
func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
data := r.Reader
|
||||
if data.Size() < azureBlockSize/10 {
|
||||
if data.Size() <= azureBlockSize/2 {
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, opts.UserDefined)
|
||||
if err != nil {
|
||||
@@ -773,9 +773,8 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
|
||||
return a.GetObjectInfo(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
blockIDs := make(map[string]string)
|
||||
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
var blocks []storage.Block
|
||||
subPartSize, subPartNumber := int64(azureBlockSize), 1
|
||||
for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize {
|
||||
// Allow to create zero sized part.
|
||||
@@ -788,45 +787,18 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
|
||||
}
|
||||
|
||||
id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID()))
|
||||
blockIDs[id] = ""
|
||||
if err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil); err != nil {
|
||||
err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(err, bucket, object)
|
||||
}
|
||||
blocks = append(blocks, storage.Block{
|
||||
ID: id,
|
||||
Status: storage.BlockStatusUncommitted,
|
||||
})
|
||||
subPartNumber++
|
||||
}
|
||||
|
||||
objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(err, bucket, object)
|
||||
}
|
||||
getBlocks := func(blocksMap map[string]string) (blocks []storage.Block, size int64, aerr error) {
|
||||
for _, part := range resp.UncommittedBlocks {
|
||||
if _, ok := blocksMap[part.Name]; ok {
|
||||
blocks = append(blocks, storage.Block{
|
||||
ID: part.Name,
|
||||
Status: storage.BlockStatusUncommitted,
|
||||
})
|
||||
|
||||
size += part.Size
|
||||
}
|
||||
}
|
||||
|
||||
if len(blocks) == 0 {
|
||||
return nil, 0, minio.InvalidPart{}
|
||||
}
|
||||
|
||||
return blocks, size, nil
|
||||
}
|
||||
|
||||
var blocks []storage.Block
|
||||
blocks, _, err = getBlocks(blockIDs)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
if err = objBlob.PutBlockList(blocks, nil); err != nil {
|
||||
if err = blob.PutBlockList(blocks, nil); err != nil {
|
||||
return objInfo, azureToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
@@ -836,14 +808,14 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
|
||||
|
||||
// Save md5sum for future processing on the object.
|
||||
opts.UserDefined["x-amz-meta-md5sum"] = r.MD5CurrentHexString()
|
||||
objBlob.Metadata, objBlob.Properties, err = s3MetaToAzureProperties(ctx, opts.UserDefined)
|
||||
blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, opts.UserDefined)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(err, bucket, object)
|
||||
}
|
||||
if err = objBlob.SetProperties(nil); err != nil {
|
||||
if err = blob.SetProperties(nil); err != nil {
|
||||
return objInfo, azureToObjectError(err, bucket, object)
|
||||
}
|
||||
if err = objBlob.SetMetadata(nil); err != nil {
|
||||
if err = blob.SetMetadata(nil); err != nil {
|
||||
return objInfo, azureToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
@@ -995,13 +967,12 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload
|
||||
}
|
||||
|
||||
id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID()))
|
||||
partMetaV1.BlockIDs = append(partMetaV1.BlockIDs, id)
|
||||
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil)
|
||||
if err != nil {
|
||||
return info, azureToObjectError(err, bucket, object)
|
||||
}
|
||||
partMetaV1.BlockIDs = append(partMetaV1.BlockIDs, id)
|
||||
subPartNumber++
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user