mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
azure: add stateless gateway support (#4874)
Previously init multipart upload stores metadata of an object which is used for complete multipart. This patch makes azure gateway to store metadata information of init multipart object in azure in the name of 'minio.sys.tmp/multipart/v1/<UPLOAD-ID>/meta.json' and uses this information on complete multipart.
This commit is contained in:
parent
fba1669966
commit
70fec0a53f
@ -17,13 +17,19 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/sha256"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/azure-sdk-for-go/storage"
|
"github.com/Azure/azure-sdk-for-go/storage"
|
||||||
@ -33,6 +39,7 @@ import (
|
|||||||
|
|
||||||
const globalAzureAPIVersion = "2016-05-31"
|
const globalAzureAPIVersion = "2016-05-31"
|
||||||
const azureBlockSize = 100 * humanize.MiByte
|
const azureBlockSize = 100 * humanize.MiByte
|
||||||
|
const metadataObjectNameTemplate = globalMinioSysTmp + "multipart/v1/%s.%x/azure.json"
|
||||||
|
|
||||||
// Canonicalize the metadata headers, without this azure-sdk calculates
|
// Canonicalize the metadata headers, without this azure-sdk calculates
|
||||||
// incorrect signature. This attempt to canonicalize is to convert
|
// incorrect signature. This attempt to canonicalize is to convert
|
||||||
@ -87,38 +94,9 @@ func azureToS3ETag(etag string) string {
|
|||||||
return canonicalizeETag(etag) + "-1"
|
return canonicalizeETag(etag) + "-1"
|
||||||
}
|
}
|
||||||
|
|
||||||
// To store metadata during NewMultipartUpload which will be used after
|
|
||||||
// CompleteMultipartUpload to call SetBlobMetadata.
|
|
||||||
type azureMultipartMetaInfo struct {
|
|
||||||
meta map[string]map[string]string
|
|
||||||
*sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return metadata map of the multipart object.
|
|
||||||
func (a *azureMultipartMetaInfo) get(key string) map[string]string {
|
|
||||||
a.Lock()
|
|
||||||
defer a.Unlock()
|
|
||||||
return a.meta[key]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set metadata map for the multipart object.
|
|
||||||
func (a *azureMultipartMetaInfo) set(key string, value map[string]string) {
|
|
||||||
a.Lock()
|
|
||||||
defer a.Unlock()
|
|
||||||
a.meta[key] = value
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete metadata map for the multipart object.
|
|
||||||
func (a *azureMultipartMetaInfo) del(key string) {
|
|
||||||
a.Lock()
|
|
||||||
defer a.Unlock()
|
|
||||||
delete(a.meta, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// azureObjects - Implements Object layer for Azure blob storage.
|
// azureObjects - Implements Object layer for Azure blob storage.
|
||||||
type azureObjects struct {
|
type azureObjects struct {
|
||||||
client storage.BlobStorageClient // Azure sdk client
|
client storage.BlobStorageClient // Azure sdk client
|
||||||
metaInfo azureMultipartMetaInfo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert azure errors to minio object layer errors.
|
// Convert azure errors to minio object layer errors.
|
||||||
@ -177,6 +155,68 @@ func azureToObjectError(err error, params ...string) error {
|
|||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mustGetAzureUploadID - returns new upload ID which is hex encoded 8 bytes random value.
|
||||||
|
func mustGetAzureUploadID() string {
|
||||||
|
var id [8]byte
|
||||||
|
|
||||||
|
n, err := io.ReadFull(rand.Reader, id[:])
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("unable to generate upload ID for azure. %s", err))
|
||||||
|
}
|
||||||
|
if n != len(id) {
|
||||||
|
panic(fmt.Errorf("insufficient random data (expected: %d, read: %d)", len(id), n))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("%x", id[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkAzureUploadID - returns error in case of given string is upload ID.
|
||||||
|
func checkAzureUploadID(uploadID string) (err error) {
|
||||||
|
if len(uploadID) != 16 {
|
||||||
|
return traceError(MalformedUploadID{uploadID})
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = hex.DecodeString(uploadID); err != nil {
|
||||||
|
return traceError(MalformedUploadID{uploadID})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode partID, subPartNumber, uploadID and md5Hex to blockID.
|
||||||
|
func azureGetBlockID(partID, subPartNumber int, uploadID, md5Hex string) string {
|
||||||
|
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s.%s", partID, subPartNumber, uploadID, md5Hex)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse blockID into partID, subPartNumber and md5Hex.
|
||||||
|
func azureParseBlockID(blockID string) (partID, subPartNumber int, uploadID, md5Hex string, err error) {
|
||||||
|
var blockIDBytes []byte
|
||||||
|
if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tokens := strings.Split(string(blockIDBytes), ".")
|
||||||
|
if len(tokens) != 4 {
|
||||||
|
err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if partID, err = strconv.Atoi(tokens[0]); err != nil || partID <= 0 {
|
||||||
|
err = fmt.Errorf("invalid part number in block id '%s'", string(blockIDBytes))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if subPartNumber, err = strconv.Atoi(tokens[1]); err != nil || subPartNumber <= 0 {
|
||||||
|
err = fmt.Errorf("invalid sub-part number in block id '%s'", string(blockIDBytes))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadID = tokens[2]
|
||||||
|
md5Hex = tokens[3]
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Inits azure blob storage client and returns AzureObjects.
|
// Inits azure blob storage client and returns AzureObjects.
|
||||||
func newAzureLayer(host string) (GatewayLayer, error) {
|
func newAzureLayer(host string) (GatewayLayer, error) {
|
||||||
var err error
|
var err error
|
||||||
@ -200,10 +240,6 @@ func newAzureLayer(host string) (GatewayLayer, error) {
|
|||||||
|
|
||||||
return &azureObjects{
|
return &azureObjects{
|
||||||
client: c.GetBlobService(),
|
client: c.GetBlobService(),
|
||||||
metaInfo: azureMultipartMetaInfo{
|
|
||||||
meta: make(map[string]map[string]string),
|
|
||||||
Mutex: &sync.Mutex{},
|
|
||||||
},
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,6 +322,9 @@ func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, max
|
|||||||
result.IsTruncated = resp.NextMarker != ""
|
result.IsTruncated = resp.NextMarker != ""
|
||||||
result.NextMarker = resp.NextMarker
|
result.NextMarker = resp.NextMarker
|
||||||
for _, object := range resp.Blobs {
|
for _, object := range resp.Blobs {
|
||||||
|
if strings.HasPrefix(object.Name, globalMinioSysTmp) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
continue
|
continue
|
||||||
@ -300,6 +339,14 @@ func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, max
|
|||||||
ContentEncoding: object.Properties.ContentEncoding,
|
ContentEncoding: object.Properties.ContentEncoding,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove minio.sys.tmp prefix.
|
||||||
|
for i, prefix := range resp.BlobPrefixes {
|
||||||
|
if prefix == globalMinioSysTmp {
|
||||||
|
resp.BlobPrefixes = append(resp.BlobPrefixes[:i], resp.BlobPrefixes[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
result.Prefixes = resp.BlobPrefixes
|
result.Prefixes = resp.BlobPrefixes
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -321,6 +368,9 @@ func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, f
|
|||||||
result.NextContinuationToken = resp.NextMarker
|
result.NextContinuationToken = resp.NextMarker
|
||||||
}
|
}
|
||||||
for _, object := range resp.Blobs {
|
for _, object := range resp.Blobs {
|
||||||
|
if strings.HasPrefix(object.Name, globalMinioSysTmp) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
continue
|
continue
|
||||||
@ -335,6 +385,14 @@ func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, f
|
|||||||
ContentEncoding: object.Properties.ContentEncoding,
|
ContentEncoding: object.Properties.ContentEncoding,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove minio.sys.tmp prefix.
|
||||||
|
for i, prefix := range resp.BlobPrefixes {
|
||||||
|
if prefix == globalMinioSysTmp {
|
||||||
|
resp.BlobPrefixes = append(resp.BlobPrefixes[:i], resp.BlobPrefixes[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
result.Prefixes = resp.BlobPrefixes
|
result.Prefixes = resp.BlobPrefixes
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -437,38 +495,54 @@ func (a *azureObjects) DeleteObject(bucket, object string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListMultipartUploads - Incomplete implementation, for now just return the prefix if it is an incomplete upload.
|
// ListMultipartUploads - It's decided not to support List Multipart Uploads, hence returning empty result.
|
||||||
// FIXME: Full ListMultipartUploads is not supported yet. It is supported just enough to help our client libs to
|
|
||||||
// support re-uploads. a.client.ListBlobs() can be made to return entries which include uncommitted blobs using
|
|
||||||
// which we need to filter out the committed blobs to get the list of uncommitted blobs.
|
|
||||||
func (a *azureObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
func (a *azureObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
||||||
result.MaxUploads = maxUploads
|
// It's decided not to support List Multipart Uploads, hence returning empty result.
|
||||||
result.Prefix = prefix
|
|
||||||
result.Delimiter = delimiter
|
|
||||||
meta := a.metaInfo.get(prefix)
|
|
||||||
if meta == nil {
|
|
||||||
// In case minio was restarted after NewMultipartUpload and before CompleteMultipartUpload we expect
|
|
||||||
// the client to do a fresh upload so that any metadata like content-type are sent again in the
|
|
||||||
// NewMultipartUpload.
|
|
||||||
return result, nil
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMultipartMetadata struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Metadata map[string]string `json:"metadata"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAzureMetadataObjectName(objectName, uploadID string) string {
|
||||||
|
return fmt.Sprintf(metadataObjectNameTemplate, uploadID, sha256.Sum256([]byte(objectName)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *azureObjects) checkUploadIDExists(bucketName, objectName, uploadID string) (err error) {
|
||||||
|
_, err = a.client.GetBlobMetadata(bucketName, getAzureMetadataObjectName(objectName, uploadID))
|
||||||
|
err = azureToObjectError(traceError(err), bucketName, objectName)
|
||||||
|
oerr := ObjectNotFound{bucketName, objectName}
|
||||||
|
if errorCause(err) == oerr {
|
||||||
|
err = traceError(InvalidUploadID{})
|
||||||
}
|
}
|
||||||
result.Uploads = []uploadMetadata{{prefix, prefix, UTCNow(), "", nil}}
|
return err
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMultipartUpload - Use Azure equivalent CreateBlockBlob.
|
// NewMultipartUpload - Use Azure equivalent CreateBlockBlob.
|
||||||
func (a *azureObjects) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error) {
|
func (a *azureObjects) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error) {
|
||||||
// Azure doesn't return a unique upload ID and we use object name in place of it. Azure allows multiple uploads to
|
|
||||||
// co-exist as long as the user keeps the blocks uploaded (in block blobs) unique amongst concurrent upload attempts.
|
|
||||||
// Each concurrent client, keeps its own blockID list which it can commit.
|
|
||||||
uploadID = object
|
|
||||||
if metadata == nil {
|
if metadata == nil {
|
||||||
// Store an empty map as a placeholder else ListObjectParts/PutObjectPart will not work properly.
|
|
||||||
metadata = make(map[string]string)
|
metadata = make(map[string]string)
|
||||||
} else {
|
|
||||||
metadata = s3ToAzureHeaders(metadata)
|
|
||||||
}
|
}
|
||||||
a.metaInfo.set(uploadID, metadata)
|
|
||||||
|
uploadID = mustGetAzureUploadID()
|
||||||
|
if err = a.checkUploadIDExists(bucket, object, uploadID); err == nil {
|
||||||
|
return "", traceError(errors.New("Upload ID name collision"))
|
||||||
|
}
|
||||||
|
metadataObject := getAzureMetadataObjectName(object, uploadID)
|
||||||
|
metadata = s3ToAzureHeaders(metadata)
|
||||||
|
|
||||||
|
var jsonData []byte
|
||||||
|
if jsonData, err = json.Marshal(azureMultipartMetadata{Name: object, Metadata: metadata}); err != nil {
|
||||||
|
return "", traceError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = a.client.CreateBlockBlobFromReader(bucket, metadataObject, uint64(len(jsonData)), bytes.NewBuffer(jsonData), nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", azureToObjectError(traceError(err), bucket, metadataObject)
|
||||||
|
}
|
||||||
|
|
||||||
return uploadID, nil
|
return uploadID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -477,29 +551,14 @@ func (a *azureObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObje
|
|||||||
return info, traceError(NotImplemented{})
|
return info, traceError(NotImplemented{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode partID, subPartNumber and md5Hex to blockID.
|
|
||||||
func azureGetBlockID(partID, subPartNumber int, md5Hex string) string {
|
|
||||||
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s", partID, subPartNumber, md5Hex)))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse blockID into partID, subPartNumber and md5Hex.
|
|
||||||
func azureParseBlockID(blockID string) (partID, subPartNumber int, md5Hex string, err error) {
|
|
||||||
var blockIDBytes []byte
|
|
||||||
if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = fmt.Sscanf(string(blockIDBytes), "%05d.%02d.%s", &partID, &subPartNumber, &md5Hex); err != nil {
|
|
||||||
err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes))
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutObjectPart - Use Azure equivalent PutBlockWithLength.
|
// PutObjectPart - Use Azure equivalent PutBlockWithLength.
|
||||||
func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *HashReader) (info PartInfo, err error) {
|
func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *HashReader) (info PartInfo, err error) {
|
||||||
if meta := a.metaInfo.get(uploadID); meta == nil {
|
if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil {
|
||||||
return info, traceError(InvalidUploadID{})
|
return info, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = checkAzureUploadID(uploadID); err != nil {
|
||||||
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
etag := data.md5Sum
|
etag := data.md5Sum
|
||||||
@ -519,7 +578,7 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int
|
|||||||
subPartSize = remainingSize
|
subPartSize = remainingSize
|
||||||
}
|
}
|
||||||
|
|
||||||
id := azureGetBlockID(partID, subPartNumber, etag)
|
id := azureGetBlockID(partID, subPartNumber, uploadID, etag)
|
||||||
err = a.client.PutBlockWithLength(bucket, object, id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil)
|
err = a.client.PutBlockWithLength(bucket, object, id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, azureToObjectError(traceError(err), bucket, object)
|
return info, azureToObjectError(traceError(err), bucket, object)
|
||||||
@ -540,66 +599,56 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int
|
|||||||
|
|
||||||
// ListObjectParts - Use Azure equivalent GetBlockList.
|
// ListObjectParts - Use Azure equivalent GetBlockList.
|
||||||
func (a *azureObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) {
|
func (a *azureObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) {
|
||||||
result.Bucket = bucket
|
if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil {
|
||||||
result.Object = object
|
|
||||||
result.UploadID = uploadID
|
|
||||||
result.MaxParts = maxParts
|
|
||||||
|
|
||||||
if meta := a.metaInfo.get(uploadID); meta == nil {
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted)
|
|
||||||
if err != nil {
|
|
||||||
return result, azureToObjectError(traceError(err), bucket, object)
|
|
||||||
}
|
|
||||||
tmpMaxParts := 0
|
|
||||||
partCount := 0 // Used for figuring out IsTruncated.
|
|
||||||
nextPartNumberMarker := 0
|
|
||||||
for _, part := range resp.UncommittedBlocks {
|
|
||||||
if tmpMaxParts == maxParts {
|
|
||||||
// Also takes care of the case if maxParts = 0
|
|
||||||
break
|
|
||||||
}
|
|
||||||
partCount++
|
|
||||||
partID, _, md5Hex, err := azureParseBlockID(part.Name)
|
|
||||||
if err != nil {
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
if partID <= partNumberMarker {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
result.Parts = append(result.Parts, PartInfo{
|
|
||||||
partID,
|
|
||||||
UTCNow(),
|
|
||||||
md5Hex,
|
|
||||||
part.Size,
|
|
||||||
})
|
|
||||||
tmpMaxParts++
|
|
||||||
nextPartNumberMarker = partID
|
|
||||||
}
|
|
||||||
if partCount < len(resp.UncommittedBlocks) {
|
|
||||||
result.IsTruncated = true
|
|
||||||
result.NextPartNumberMarker = nextPartNumberMarker
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// It's decided not to support List Object Parts, hence returning empty result.
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AbortMultipartUpload - Not Implemented.
|
// AbortMultipartUpload - Not Implemented.
|
||||||
// There is no corresponding API in azure to abort an incomplete upload. The uncommmitted blocks
|
// There is no corresponding API in azure to abort an incomplete upload. The uncommmitted blocks
|
||||||
// gets deleted after one week.
|
// gets deleted after one week.
|
||||||
func (a *azureObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
|
func (a *azureObjects) AbortMultipartUpload(bucket, object, uploadID string) (err error) {
|
||||||
a.metaInfo.del(uploadID)
|
if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil {
|
||||||
return nil
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.client.DeleteBlob(bucket, getAzureMetadataObjectName(object, uploadID), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompleteMultipartUpload - Use Azure equivalent PutBlockList.
|
// CompleteMultipartUpload - Use Azure equivalent PutBlockList.
|
||||||
func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (objInfo ObjectInfo, err error) {
|
func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (objInfo ObjectInfo, err error) {
|
||||||
meta := a.metaInfo.get(uploadID)
|
metadataObject := getAzureMetadataObjectName(object, uploadID)
|
||||||
if meta == nil {
|
if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil {
|
||||||
return objInfo, traceError(InvalidUploadID{uploadID})
|
return objInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = checkAzureUploadID(uploadID); err != nil {
|
||||||
|
return objInfo, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var metadataReader io.Reader
|
||||||
|
if metadataReader, err = a.client.GetBlob(bucket, metadataObject); err != nil {
|
||||||
|
return objInfo, azureToObjectError(traceError(err), bucket, metadataObject)
|
||||||
|
}
|
||||||
|
|
||||||
|
var metadata azureMultipartMetadata
|
||||||
|
if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
|
||||||
|
return objInfo, azureToObjectError(traceError(err), bucket, metadataObject)
|
||||||
|
}
|
||||||
|
|
||||||
|
meta := metadata.Metadata
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
derr := a.client.DeleteBlob(bucket, metadataObject, nil)
|
||||||
|
errorIf(derr, "unable to remove meta data object for upload ID %s", uploadID)
|
||||||
|
}()
|
||||||
|
|
||||||
resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted)
|
resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||||
@ -608,12 +657,13 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
|
|||||||
getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) {
|
getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) {
|
||||||
for _, part := range resp.UncommittedBlocks {
|
for _, part := range resp.UncommittedBlocks {
|
||||||
var partID int
|
var partID int
|
||||||
|
var readUploadID string
|
||||||
var md5Hex string
|
var md5Hex string
|
||||||
if partID, _, md5Hex, err = azureParseBlockID(part.Name); err != nil {
|
if partID, _, readUploadID, md5Hex, err = azureParseBlockID(part.Name); err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if partNumber == partID && etag == md5Hex {
|
if partNumber == partID && uploadID == readUploadID && etag == md5Hex {
|
||||||
blocks = append(blocks, storage.Block{
|
blocks = append(blocks, storage.Block{
|
||||||
ID: part.Name,
|
ID: part.Name,
|
||||||
Status: storage.BlockStatusUncommitted,
|
Status: storage.BlockStatusUncommitted,
|
||||||
@ -676,7 +726,6 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
|
|||||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
a.metaInfo.del(uploadID)
|
|
||||||
return a.GetObjectInfo(bucket, object)
|
return a.GetObjectInfo(bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,14 +147,15 @@ func TestAzureGetBlockID(t *testing.T) {
|
|||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
partID int
|
partID int
|
||||||
subPartNumber int
|
subPartNumber int
|
||||||
|
uploadID string
|
||||||
md5 string
|
md5 string
|
||||||
blockID string
|
blockID string
|
||||||
}{
|
}{
|
||||||
{1, 7, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="},
|
{1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ=="},
|
||||||
{2, 19, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="},
|
{2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw=="},
|
||||||
}
|
}
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
blockID := azureGetBlockID(test.partID, test.subPartNumber, test.md5)
|
blockID := azureGetBlockID(test.partID, test.subPartNumber, test.uploadID, test.md5)
|
||||||
if blockID != test.blockID {
|
if blockID != test.blockID {
|
||||||
t.Fatalf("%s is not equal to %s", blockID, test.blockID)
|
t.Fatalf("%s is not equal to %s", blockID, test.blockID)
|
||||||
}
|
}
|
||||||
@ -167,13 +168,14 @@ func TestAzureParseBlockID(t *testing.T) {
|
|||||||
blockID string
|
blockID string
|
||||||
partID int
|
partID int
|
||||||
subPartNumber int
|
subPartNumber int
|
||||||
|
uploadID string
|
||||||
md5 string
|
md5 string
|
||||||
}{
|
}{
|
||||||
{"MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U=", 1, 7, "d41d8cd98f00b204e9800998ecf8427e"},
|
{"MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ==", 1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e"},
|
||||||
{"MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM=", 2, 19, "a7fb6b7b36ee4ed66b5546fac4690273"},
|
{"MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw==", 2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273"},
|
||||||
}
|
}
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
partID, subPartNumber, md5, err := azureParseBlockID(test.blockID)
|
partID, subPartNumber, uploadID, md5, err := azureParseBlockID(test.blockID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -183,12 +185,15 @@ func TestAzureParseBlockID(t *testing.T) {
|
|||||||
if subPartNumber != test.subPartNumber {
|
if subPartNumber != test.subPartNumber {
|
||||||
t.Fatalf("%d not equal to %d", subPartNumber, test.subPartNumber)
|
t.Fatalf("%d not equal to %d", subPartNumber, test.subPartNumber)
|
||||||
}
|
}
|
||||||
|
if uploadID != test.uploadID {
|
||||||
|
t.Fatalf("%s not equal to %s", uploadID, test.uploadID)
|
||||||
|
}
|
||||||
if md5 != test.md5 {
|
if md5 != test.md5 {
|
||||||
t.Fatalf("%s not equal to %s", md5, test.md5)
|
t.Fatalf("%s not equal to %s", md5, test.md5)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, _, err := azureParseBlockID("junk")
|
_, _, _, _, err := azureParseBlockID("junk")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatal("Expected azureParseBlockID() to return error")
|
t.Fatal("Expected azureParseBlockID() to return error")
|
||||||
}
|
}
|
||||||
@ -271,3 +276,29 @@ func TestAnonErrToObjectErr(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCheckAzureUploadID(t *testing.T) {
|
||||||
|
invalidUploadIDs := []string{
|
||||||
|
"123456789abcdefg",
|
||||||
|
"hello world",
|
||||||
|
"0x1234567890",
|
||||||
|
"1234567890abcdef1234567890abcdef",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, uploadID := range invalidUploadIDs {
|
||||||
|
if err := checkAzureUploadID(uploadID); err == nil {
|
||||||
|
t.Fatalf("%s: expected: <error>, got: <nil>", uploadID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
validUploadIDs := []string{
|
||||||
|
"1234567890abcdef",
|
||||||
|
"1122334455667788",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, uploadID := range validUploadIDs {
|
||||||
|
if err := checkAzureUploadID(uploadID); err != nil {
|
||||||
|
t.Fatalf("%s: expected: <nil>, got: %s", uploadID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -40,15 +40,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// gcsMinioSysTmp is used for multiparts. We have "minio.sys.tmp" prefix so that
|
|
||||||
// listing on the GCS lists this entry in the end. Also in the gateway
|
|
||||||
// ListObjects we filter out this entry.
|
|
||||||
gcsMinioSysTmp = "minio.sys.tmp/"
|
|
||||||
|
|
||||||
// Path where multipart objects are saved.
|
// Path where multipart objects are saved.
|
||||||
// If we change the backend format we will use a different url path like /multipart/v2
|
// If we change the backend format we will use a different url path like /multipart/v2
|
||||||
// but we will not migrate old data.
|
// but we will not migrate old data.
|
||||||
gcsMinioMultipartPathV1 = gcsMinioSysTmp + "multipart/v1"
|
gcsMinioMultipartPathV1 = globalMinioSysTmp + "multipart/v1"
|
||||||
|
|
||||||
// Multipart meta file.
|
// Multipart meta file.
|
||||||
gcsMinioMultipartMeta = "gcs.json"
|
gcsMinioMultipartMeta = "gcs.json"
|
||||||
@ -290,7 +285,7 @@ func newGCSGateway(projectID string) (GatewayLayer, error) {
|
|||||||
|
|
||||||
// Cleanup old files in minio.sys.tmp of the given bucket.
|
// Cleanup old files in minio.sys.tmp of the given bucket.
|
||||||
func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) {
|
func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) {
|
||||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: gcsMinioSysTmp, Versions: false})
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: globalMinioSysTmp, Versions: false})
|
||||||
for {
|
for {
|
||||||
attrs, err := it.Next()
|
attrs, err := it.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -408,7 +403,7 @@ func (l *gcsGateway) DeleteBucket(bucket string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return gcsToObjectError(traceError(err))
|
return gcsToObjectError(traceError(err))
|
||||||
}
|
}
|
||||||
if objAttrs.Prefix == gcsMinioSysTmp {
|
if objAttrs.Prefix == globalMinioSysTmp {
|
||||||
gcsMinioPathFound = true
|
gcsMinioPathFound = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -420,7 +415,7 @@ func (l *gcsGateway) DeleteBucket(bucket string) error {
|
|||||||
}
|
}
|
||||||
if gcsMinioPathFound {
|
if gcsMinioPathFound {
|
||||||
// Remove minio.sys.tmp before deleting the bucket.
|
// Remove minio.sys.tmp before deleting the bucket.
|
||||||
itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: gcsMinioSysTmp})
|
itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: globalMinioSysTmp})
|
||||||
for {
|
for {
|
||||||
objAttrs, err := itObject.Next()
|
objAttrs, err := itObject.Next()
|
||||||
if err == iterator.Done {
|
if err == iterator.Done {
|
||||||
@ -505,7 +500,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
|
|||||||
// metadata folder, then just break
|
// metadata folder, then just break
|
||||||
// otherwise we've truncated the output
|
// otherwise we've truncated the output
|
||||||
attrs, _ := it.Next()
|
attrs, _ := it.Next()
|
||||||
if attrs != nil && attrs.Prefix == gcsMinioSysTmp {
|
if attrs != nil && attrs.Prefix == globalMinioSysTmp {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -523,16 +518,16 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
|
|||||||
|
|
||||||
nextMarker = toGCSPageToken(attrs.Name)
|
nextMarker = toGCSPageToken(attrs.Name)
|
||||||
|
|
||||||
if attrs.Prefix == gcsMinioSysTmp {
|
if attrs.Prefix == globalMinioSysTmp {
|
||||||
// We don't return our metadata prefix.
|
// We don't return our metadata prefix.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !strings.HasPrefix(prefix, gcsMinioSysTmp) {
|
if !strings.HasPrefix(prefix, globalMinioSysTmp) {
|
||||||
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
|
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
|
||||||
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
|
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
|
||||||
// which will be helpful to observe the "directory structure" for debugging purposes.
|
// which will be helpful to observe the "directory structure" for debugging purposes.
|
||||||
if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) ||
|
if strings.HasPrefix(attrs.Prefix, globalMinioSysTmp) ||
|
||||||
strings.HasPrefix(attrs.Name, gcsMinioSysTmp) {
|
strings.HasPrefix(attrs.Name, globalMinioSysTmp) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -605,16 +600,16 @@ func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fet
|
|||||||
return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix)
|
return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
if attrs.Prefix == gcsMinioSysTmp {
|
if attrs.Prefix == globalMinioSysTmp {
|
||||||
// We don't return our metadata prefix.
|
// We don't return our metadata prefix.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !strings.HasPrefix(prefix, gcsMinioSysTmp) {
|
if !strings.HasPrefix(prefix, globalMinioSysTmp) {
|
||||||
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
|
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
|
||||||
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
|
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
|
||||||
// which will be helpful to observe the "directory structure" for debugging purposes.
|
// which will be helpful to observe the "directory structure" for debugging purposes.
|
||||||
if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) ||
|
if strings.HasPrefix(attrs.Prefix, globalMinioSysTmp) ||
|
||||||
strings.HasPrefix(attrs.Name, gcsMinioSysTmp) {
|
strings.HasPrefix(attrs.Name, globalMinioSysTmp) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -954,7 +949,7 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
|
|||||||
|
|
||||||
// Returns name of the composed object.
|
// Returns name of the composed object.
|
||||||
gcsMultipartComposeName := func(uploadID string, composeNumber int) string {
|
gcsMultipartComposeName := func(uploadID string, composeNumber int) string {
|
||||||
return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", gcsMinioSysTmp, uploadID, composeNumber)
|
return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", globalMinioSysTmp, uploadID, composeNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
composeCount := int(math.Ceil(float64(len(parts)) / float64(gcsMaxComponents)))
|
composeCount := int(math.Ceil(float64(len(parts)) / float64(gcsMaxComponents)))
|
||||||
|
@ -53,6 +53,10 @@ const (
|
|||||||
globalMinioModeGatewayAzure = "mode-gateway-azure"
|
globalMinioModeGatewayAzure = "mode-gateway-azure"
|
||||||
globalMinioModeGatewayS3 = "mode-gateway-s3"
|
globalMinioModeGatewayS3 = "mode-gateway-s3"
|
||||||
globalMinioModeGatewayGCS = "mode-gateway-gcs"
|
globalMinioModeGatewayGCS = "mode-gateway-gcs"
|
||||||
|
|
||||||
|
// globalMinioSysTmp prefix is used in Azure/GCS gateway for save metadata sent by Initialize Multipart Upload API.
|
||||||
|
globalMinioSysTmp = "minio.sys.tmp/"
|
||||||
|
|
||||||
// Add new global values here.
|
// Add new global values here.
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user