mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
Update Azure SDK (#4985)
This commit is contained in:
committed by
Dee Koder
parent
789270af3c
commit
94670a387e
@@ -24,11 +24,51 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/storage"
|
||||
)
|
||||
|
||||
// Copied from github.com/Azure/azure-sdk-for-go/storage/container.go
|
||||
func azureListBlobsGetParameters(p storage.ListBlobsParameters) url.Values {
|
||||
out := url.Values{}
|
||||
|
||||
if p.Prefix != "" {
|
||||
out.Set("prefix", p.Prefix)
|
||||
}
|
||||
if p.Delimiter != "" {
|
||||
out.Set("delimiter", p.Delimiter)
|
||||
}
|
||||
if p.Marker != "" {
|
||||
out.Set("marker", p.Marker)
|
||||
}
|
||||
if p.Include != nil {
|
||||
addString := func(datasets []string, include bool, text string) []string {
|
||||
if include {
|
||||
datasets = append(datasets, text)
|
||||
}
|
||||
return datasets
|
||||
}
|
||||
|
||||
include := []string{}
|
||||
include = addString(include, p.Include.Snapshots, "snapshots")
|
||||
include = addString(include, p.Include.Metadata, "metadata")
|
||||
include = addString(include, p.Include.UncommittedBlobs, "uncommittedblobs")
|
||||
include = addString(include, p.Include.Copy, "copy")
|
||||
fullInclude := strings.Join(include, ",")
|
||||
out.Set("include", fullInclude)
|
||||
}
|
||||
if p.MaxResults != 0 {
|
||||
out.Set("maxresults", fmt.Sprintf("%v", p.MaxResults))
|
||||
}
|
||||
if p.Timeout != 0 {
|
||||
out.Set("timeout", fmt.Sprintf("%v", p.Timeout))
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// Make anonymous HTTP request to azure endpoint.
|
||||
func azureAnonRequest(verb, urlStr string, header http.Header) (*http.Response, error) {
|
||||
req, err := http.NewRequest(verb, urlStr, nil)
|
||||
@@ -73,7 +113,8 @@ func azureAnonRequest(verb, urlStr string, header http.Header) (*http.Response,
|
||||
|
||||
// AnonGetBucketInfo - Get bucket metadata from azure anonymously.
|
||||
func (a *azureObjects) AnonGetBucketInfo(bucket string) (bucketInfo BucketInfo, err error) {
|
||||
url, err := url.Parse(a.client.GetBlobURL(bucket, ""))
|
||||
blobURL := a.client.GetContainerReference(bucket).GetBlobReference("").GetURL()
|
||||
url, err := url.Parse(blobURL)
|
||||
if err != nil {
|
||||
return bucketInfo, azureToObjectError(traceError(err))
|
||||
}
|
||||
@@ -116,7 +157,8 @@ func (a *azureObjects) AnonGetObject(bucket, object string, startOffset int64, l
|
||||
h.Add("Range", fmt.Sprintf("bytes=%d-", startOffset))
|
||||
}
|
||||
|
||||
resp, err := azureAnonRequest(httpGET, a.client.GetBlobURL(bucket, object), h)
|
||||
blobURL := a.client.GetContainerReference(bucket).GetBlobReference(object).GetURL()
|
||||
resp, err := azureAnonRequest(httpGET, blobURL, h)
|
||||
if err != nil {
|
||||
return azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
@@ -133,7 +175,8 @@ func (a *azureObjects) AnonGetObject(bucket, object string, startOffset int64, l
|
||||
// AnonGetObjectInfo - Send HEAD request without authentication and convert the
|
||||
// result to ObjectInfo.
|
||||
func (a *azureObjects) AnonGetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) {
|
||||
resp, err := azureAnonRequest(httpHEAD, a.client.GetBlobURL(bucket, object), nil)
|
||||
blobURL := a.client.GetContainerReference(bucket).GetBlobReference(object).GetURL()
|
||||
resp, err := azureAnonRequest(httpHEAD, blobURL, nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
@@ -184,7 +227,8 @@ func (a *azureObjects) AnonListObjects(bucket, prefix, marker, delimiter string,
|
||||
q.Set("restype", "container")
|
||||
q.Set("comp", "list")
|
||||
|
||||
url, err := url.Parse(a.client.GetBlobURL(bucket, ""))
|
||||
blobURL := a.client.GetContainerReference(bucket).GetBlobReference("").GetURL()
|
||||
url, err := url.Parse(blobURL)
|
||||
if err != nil {
|
||||
return result, azureToObjectError(traceError(err))
|
||||
}
|
||||
@@ -210,14 +254,10 @@ func (a *azureObjects) AnonListObjects(bucket, prefix, marker, delimiter string,
|
||||
result.IsTruncated = listResp.NextMarker != ""
|
||||
result.NextMarker = listResp.NextMarker
|
||||
for _, object := range listResp.Blobs {
|
||||
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
||||
if e != nil {
|
||||
continue
|
||||
}
|
||||
result.Objects = append(result.Objects, ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object.Name,
|
||||
ModTime: t,
|
||||
ModTime: time.Time(object.Properties.LastModified),
|
||||
Size: object.Properties.ContentLength,
|
||||
ETag: object.Properties.Etag,
|
||||
ContentType: object.Properties.ContentType,
|
||||
@@ -241,7 +281,8 @@ func (a *azureObjects) AnonListObjectsV2(bucket, prefix, continuationToken strin
|
||||
q.Set("restype", "container")
|
||||
q.Set("comp", "list")
|
||||
|
||||
url, err := url.Parse(a.client.GetBlobURL(bucket, ""))
|
||||
blobURL := a.client.GetContainerReference(bucket).GetBlobReference("").GetURL()
|
||||
url, err := url.Parse(blobURL)
|
||||
if err != nil {
|
||||
return result, azureToObjectError(traceError(err))
|
||||
}
|
||||
@@ -270,14 +311,10 @@ func (a *azureObjects) AnonListObjectsV2(bucket, prefix, continuationToken strin
|
||||
result.NextContinuationToken = listResp.NextMarker
|
||||
}
|
||||
for _, object := range listResp.Blobs {
|
||||
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
||||
if e != nil {
|
||||
continue
|
||||
}
|
||||
result.Objects = append(result.Objects, ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object.Name,
|
||||
ModTime: t,
|
||||
ModTime: time.Time(object.Properties.LastModified),
|
||||
Size: object.Properties.ContentLength,
|
||||
ETag: canonicalizeETag(object.Properties.Etag),
|
||||
ContentType: object.Properties.ContentType,
|
||||
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -41,52 +40,115 @@ const globalAzureAPIVersion = "2016-05-31"
|
||||
const azureBlockSize = 100 * humanize.MiByte
|
||||
const metadataObjectNameTemplate = globalMinioSysTmp + "multipart/v1/%s.%x/azure.json"
|
||||
|
||||
// Canonicalize the metadata headers, without this azure-sdk calculates
|
||||
// incorrect signature. This attempt to canonicalize is to convert
|
||||
// any HTTP header which is of form say `accept-encoding` should be
|
||||
// converted to `Accept-Encoding` in its canonical form.
|
||||
// Also replaces X-Amz-Meta prefix with X-Ms-Meta as Azure expects user
|
||||
// defined metadata to have X-Ms-Meta prefix.
|
||||
func s3ToAzureHeaders(headers map[string]string) (newHeaders map[string]string) {
|
||||
// s3MetaToAzureProperties converts metadata meant for S3 PUT/COPY
|
||||
// object into Azure data structures - BlobMetadata and
|
||||
// BlobProperties.
|
||||
//
|
||||
// BlobMetadata contains user defined key-value pairs and each key is
|
||||
// automatically prefixed with `X-Ms-Meta-` by the Azure SDK. S3
|
||||
// user-metadata is translated to Azure metadata by removing the
|
||||
// `X-Amz-Meta-` prefix.
|
||||
//
|
||||
// BlobProperties contains commonly set metadata for objects such as
|
||||
// Content-Encoding, etc. Such metadata that is accepted by S3 is
|
||||
// copied into BlobProperties.
|
||||
//
|
||||
// Header names are canonicalized as in http.Header.
|
||||
func s3MetaToAzureProperties(s3Metadata map[string]string) (storage.BlobMetadata,
|
||||
storage.BlobProperties) {
|
||||
|
||||
// Azure does not permit user-defined metadata key names to
|
||||
// contain hyphens. So we map hyphens to underscores for
|
||||
// encryption headers. More such headers may be added in the
|
||||
// future.
|
||||
gatewayHeaders := map[string]string{
|
||||
"X-Amz-Meta-X-Amz-Key": "X-Amz-Meta-x_minio_key",
|
||||
"X-Amz-Meta-X-Amz-Matdesc": "X-Amz-Meta-x_minio_matdesc",
|
||||
"X-Amz-Meta-X-Amz-Iv": "X-Amz-Meta-x_minio_iv",
|
||||
}
|
||||
|
||||
newHeaders = make(map[string]string)
|
||||
for k, v := range headers {
|
||||
var blobMeta storage.BlobMetadata = make(map[string]string)
|
||||
var props storage.BlobProperties
|
||||
for k, v := range s3Metadata {
|
||||
k = http.CanonicalHeaderKey(k)
|
||||
if nk, ok := gatewayHeaders[k]; ok {
|
||||
k = nk
|
||||
}
|
||||
if strings.HasPrefix(k, "X-Amz-Meta") {
|
||||
k = strings.Replace(k, "X-Amz-Meta", "X-Ms-Meta", -1)
|
||||
switch {
|
||||
case strings.HasPrefix(k, "X-Amz-Meta-"):
|
||||
// Strip header prefix, to let Azure SDK
|
||||
// handle it for storage.
|
||||
k = strings.Replace(k, "X-Amz-Meta-", "", 1)
|
||||
blobMeta[k] = v
|
||||
|
||||
// All cases below, extract common metadata that is
|
||||
// accepted by S3 into BlobProperties for setting on
|
||||
// Azure - see
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
|
||||
case k == "Cache-Control":
|
||||
props.CacheControl = v
|
||||
case k == "Content-Disposition":
|
||||
props.ContentDisposition = v
|
||||
case k == "Content-Encoding":
|
||||
props.ContentEncoding = v
|
||||
case k == "Content-Length":
|
||||
// assume this doesn't fail
|
||||
props.ContentLength, _ = strconv.ParseInt(v, 10, 64)
|
||||
case k == "Content-MD5":
|
||||
props.ContentMD5 = v
|
||||
case k == "Content-Type":
|
||||
props.ContentType = v
|
||||
}
|
||||
newHeaders[k] = v
|
||||
}
|
||||
return newHeaders
|
||||
return blobMeta, props
|
||||
}
|
||||
|
||||
// Prefix user metadata with "X-Amz-Meta-".
|
||||
// client.GetBlobMetadata() already strips "X-Ms-Meta-"
|
||||
func azureToS3Metadata(meta map[string]string) (newMeta map[string]string) {
|
||||
// azurePropertiesToS3Meta converts Azure metadata/properties to S3
|
||||
// metadata. It is the reverse of s3MetaToAzureProperties. Azure's
|
||||
// `.GetMetadata()` lower-cases all header keys, so this is taken into
|
||||
// account by this function.
|
||||
func azurePropertiesToS3Meta(meta storage.BlobMetadata, props storage.BlobProperties) map[string]string {
|
||||
// Remap underscores to hyphens to restore encryption
|
||||
// headers. See s3MetaToAzureProperties for details.
|
||||
gatewayHeaders := map[string]string{
|
||||
"X-Amz-Meta-x_minio_key": "X-Amz-Meta-X-Amz-Key",
|
||||
"X-Amz-Meta-x_minio_matdesc": "X-Amz-Meta-X-Amz-Matdesc",
|
||||
"X-Amz-Meta-x_minio_iv": "X-Amz-Meta-X-Amz-Iv",
|
||||
}
|
||||
|
||||
newMeta = make(map[string]string)
|
||||
|
||||
s3Metadata := make(map[string]string)
|
||||
for k, v := range meta {
|
||||
// k's `x-ms-meta-` prefix is already stripped by
|
||||
// Azure SDK, so we add the AMZ prefix.
|
||||
k = "X-Amz-Meta-" + k
|
||||
if nk, ok := gatewayHeaders[k]; ok {
|
||||
k = nk
|
||||
}
|
||||
newMeta[k] = v
|
||||
k = http.CanonicalHeaderKey(k)
|
||||
s3Metadata[k] = v
|
||||
}
|
||||
return newMeta
|
||||
|
||||
// Add each property from BlobProperties that is supported by
|
||||
// S3 PUT/COPY common metadata.
|
||||
if props.CacheControl != "" {
|
||||
s3Metadata["Cache-Control"] = props.CacheControl
|
||||
}
|
||||
if props.ContentDisposition != "" {
|
||||
s3Metadata["Content-Disposition"] = props.ContentDisposition
|
||||
}
|
||||
if props.ContentEncoding != "" {
|
||||
s3Metadata["Content-Encoding"] = props.ContentEncoding
|
||||
}
|
||||
if props.ContentLength != 0 {
|
||||
s3Metadata["Content-Length"] = fmt.Sprintf("%d", props.ContentLength)
|
||||
}
|
||||
if props.ContentMD5 != "" {
|
||||
s3Metadata["Content-MD5"] = props.ContentMD5
|
||||
}
|
||||
if props.ContentType != "" {
|
||||
s3Metadata["Content-Type"] = props.ContentType
|
||||
}
|
||||
return s3Metadata
|
||||
}
|
||||
|
||||
// Append "-1" to etag so that clients do not interpret it as MD5.
|
||||
@@ -256,13 +318,17 @@ func (a *azureObjects) StorageInfo() (si StorageInfo) {
|
||||
|
||||
// MakeBucketWithLocation - Create a new container on azure backend.
|
||||
func (a *azureObjects) MakeBucketWithLocation(bucket, location string) error {
|
||||
err := a.client.CreateContainer(bucket, storage.ContainerAccessTypePrivate)
|
||||
container := a.client.GetContainerReference(bucket)
|
||||
err := container.Create(&storage.CreateContainerOptions{
|
||||
Access: storage.ContainerAccessTypePrivate,
|
||||
})
|
||||
return azureToObjectError(traceError(err), bucket)
|
||||
}
|
||||
|
||||
// GetBucketInfo - Get bucket metadata..
|
||||
func (a *azureObjects) GetBucketInfo(bucket string) (bi BucketInfo, e error) {
|
||||
// Azure does not have an equivalent call, hence use ListContainers.
|
||||
// Azure does not have an equivalent call, hence use
|
||||
// ListContainers with prefix
|
||||
resp, err := a.client.ListContainers(storage.ListContainersParameters{
|
||||
Prefix: bucket,
|
||||
})
|
||||
@@ -304,13 +370,15 @@ func (a *azureObjects) ListBuckets() (buckets []BucketInfo, err error) {
|
||||
|
||||
// DeleteBucket - delete a container on azure, uses Azure equivalent DeleteContainer.
|
||||
func (a *azureObjects) DeleteBucket(bucket string) error {
|
||||
return azureToObjectError(traceError(a.client.DeleteContainer(bucket)), bucket)
|
||||
container := a.client.GetContainerReference(bucket)
|
||||
return azureToObjectError(traceError(container.Delete(nil)), bucket)
|
||||
}
|
||||
|
||||
// ListObjects - lists all blobs on azure with in a container filtered by prefix
|
||||
// and marker, uses Azure equivalent ListBlobs.
|
||||
func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
|
||||
resp, err := a.client.ListBlobs(bucket, storage.ListBlobsParameters{
|
||||
container := a.client.GetContainerReference(bucket)
|
||||
resp, err := container.ListBlobs(storage.ListBlobsParameters{
|
||||
Prefix: prefix,
|
||||
Marker: marker,
|
||||
Delimiter: delimiter,
|
||||
@@ -325,14 +393,10 @@ func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, max
|
||||
if strings.HasPrefix(object.Name, globalMinioSysTmp) {
|
||||
continue
|
||||
}
|
||||
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
||||
if e != nil {
|
||||
continue
|
||||
}
|
||||
result.Objects = append(result.Objects, ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object.Name,
|
||||
ModTime: t,
|
||||
ModTime: time.Time(object.Properties.LastModified),
|
||||
Size: object.Properties.ContentLength,
|
||||
ETag: azureToS3ETag(object.Properties.Etag),
|
||||
ContentType: object.Properties.ContentType,
|
||||
@@ -353,7 +417,8 @@ func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, max
|
||||
|
||||
// ListObjectsV2 - list all blobs in Azure bucket filtered by prefix
|
||||
func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error) {
|
||||
resp, err := a.client.ListBlobs(bucket, storage.ListBlobsParameters{
|
||||
container := a.client.GetContainerReference(bucket)
|
||||
resp, err := container.ListBlobs(storage.ListBlobsParameters{
|
||||
Prefix: prefix,
|
||||
Marker: continuationToken,
|
||||
Delimiter: delimiter,
|
||||
@@ -371,14 +436,10 @@ func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, f
|
||||
if strings.HasPrefix(object.Name, globalMinioSysTmp) {
|
||||
continue
|
||||
}
|
||||
t, e := time.Parse(time.RFC1123, object.Properties.LastModified)
|
||||
if e != nil {
|
||||
continue
|
||||
}
|
||||
result.Objects = append(result.Objects, ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object.Name,
|
||||
ModTime: t,
|
||||
ModTime: time.Time(object.Properties.LastModified),
|
||||
Size: object.Properties.ContentLength,
|
||||
ETag: azureToS3ETag(object.Properties.Etag),
|
||||
ContentType: object.Properties.ContentType,
|
||||
@@ -404,17 +465,20 @@ func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, f
|
||||
// startOffset indicates the starting read location of the object.
|
||||
// length indicates the total length of the object.
|
||||
func (a *azureObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error {
|
||||
byteRange := fmt.Sprintf("%d-", startOffset)
|
||||
blobRange := &storage.BlobRange{Start: uint64(startOffset)}
|
||||
if length > 0 && startOffset > 0 {
|
||||
byteRange = fmt.Sprintf("%d-%d", startOffset, startOffset+length-1)
|
||||
blobRange.End = uint64(startOffset + length - 1)
|
||||
}
|
||||
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
var rc io.ReadCloser
|
||||
var err error
|
||||
if startOffset == 0 && length == 0 {
|
||||
rc, err = a.client.GetBlob(bucket, object)
|
||||
rc, err = blob.Get(nil)
|
||||
} else {
|
||||
rc, err = a.client.GetBlobRange(bucket, object, byteRange, nil)
|
||||
rc, err = blob.GetRange(&storage.GetBlobRangeOptions{
|
||||
Range: blobRange,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return azureToObjectError(traceError(err), bucket, object)
|
||||
@@ -427,36 +491,23 @@ func (a *azureObjects) GetObject(bucket, object string, startOffset int64, lengt
|
||||
// GetObjectInfo - reads blob metadata properties and replies back ObjectInfo,
|
||||
// uses zure equivalent GetBlobProperties.
|
||||
func (a *azureObjects) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) {
|
||||
blobMeta, err := a.client.GetBlobMetadata(bucket, object)
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
err = blob.GetProperties(nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
|
||||
meta := azureToS3Metadata(blobMeta)
|
||||
|
||||
prop, err := a.client.GetBlobProperties(bucket, object)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
t, err := time.Parse(time.RFC1123, prop.LastModified)
|
||||
if err != nil {
|
||||
return objInfo, traceError(err)
|
||||
}
|
||||
|
||||
if prop.ContentEncoding != "" {
|
||||
meta["Content-Encoding"] = prop.ContentEncoding
|
||||
}
|
||||
meta["Content-Type"] = prop.ContentType
|
||||
|
||||
meta := azurePropertiesToS3Meta(blob.Metadata, blob.Properties)
|
||||
objInfo = ObjectInfo{
|
||||
Bucket: bucket,
|
||||
UserDefined: meta,
|
||||
ETag: azureToS3ETag(prop.Etag),
|
||||
ModTime: t,
|
||||
Name: object,
|
||||
Size: prop.ContentLength,
|
||||
Bucket: bucket,
|
||||
UserDefined: meta,
|
||||
ETag: azureToS3ETag(blob.Properties.Etag),
|
||||
ModTime: time.Time(blob.Properties.LastModified),
|
||||
Name: object,
|
||||
Size: blob.Properties.ContentLength,
|
||||
ContentType: blob.Properties.ContentType,
|
||||
ContentEncoding: blob.Properties.ContentEncoding,
|
||||
}
|
||||
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
@@ -464,12 +515,16 @@ func (a *azureObjects) GetObjectInfo(bucket, object string) (objInfo ObjectInfo,
|
||||
// uses Azure equivalent CreateBlockBlobFromReader.
|
||||
func (a *azureObjects) PutObject(bucket, object string, data *HashReader, metadata map[string]string) (objInfo ObjectInfo, err error) {
|
||||
delete(metadata, "etag")
|
||||
err = a.client.CreateBlockBlobFromReader(bucket, object, uint64(data.Size()), data, s3ToAzureHeaders(metadata))
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
blob.Metadata, blob.Properties = s3MetaToAzureProperties(metadata)
|
||||
err = blob.CreateBlockBlobFromReader(data, nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
if err = data.Verify(); err != nil {
|
||||
a.client.DeleteBlob(bucket, object, nil)
|
||||
errorIf(err, "Verification of uploaded object data failed against client provided checksums.")
|
||||
derr := blob.Delete(nil)
|
||||
errorIf(derr, "Failed to delete blob when cleaning up a bad blob upload.")
|
||||
return ObjectInfo{}, azureToObjectError(traceError(err))
|
||||
}
|
||||
return a.GetObjectInfo(bucket, object)
|
||||
@@ -478,7 +533,9 @@ func (a *azureObjects) PutObject(bucket, object string, data *HashReader, metada
|
||||
// CopyObject - Copies a blob from source container to destination container.
|
||||
// Uses Azure equivalent CopyBlob API.
|
||||
func (a *azureObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string) (objInfo ObjectInfo, err error) {
|
||||
err = a.client.CopyBlob(destBucket, destObject, a.client.GetBlobURL(srcBucket, srcObject))
|
||||
srcBlobURL := a.client.GetContainerReference(srcBucket).GetBlobReference(srcObject).GetURL()
|
||||
destBlob := a.client.GetContainerReference(destBucket).GetBlobReference(destObject)
|
||||
err = destBlob.Copy(srcBlobURL, nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), srcBucket, srcObject)
|
||||
}
|
||||
@@ -488,7 +545,8 @@ func (a *azureObjects) CopyObject(srcBucket, srcObject, destBucket, destObject s
|
||||
// DeleteObject - Deletes a blob on azure container, uses Azure
|
||||
// equivalent DeleteBlob API.
|
||||
func (a *azureObjects) DeleteObject(bucket, object string) error {
|
||||
err := a.client.DeleteBlob(bucket, object, nil)
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
err := blob.Delete(nil)
|
||||
if err != nil {
|
||||
return azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
@@ -511,7 +569,9 @@ func getAzureMetadataObjectName(objectName, uploadID string) string {
|
||||
}
|
||||
|
||||
func (a *azureObjects) checkUploadIDExists(bucketName, objectName, uploadID string) (err error) {
|
||||
_, err = a.client.GetBlobMetadata(bucketName, getAzureMetadataObjectName(objectName, uploadID))
|
||||
blob := a.client.GetContainerReference(bucketName).GetBlobReference(
|
||||
getAzureMetadataObjectName(objectName, uploadID))
|
||||
err = blob.GetMetadata(nil)
|
||||
err = azureToObjectError(traceError(err), bucketName, objectName)
|
||||
oerr := ObjectNotFound{bucketName, objectName}
|
||||
if errorCause(err) == oerr {
|
||||
@@ -531,14 +591,14 @@ func (a *azureObjects) NewMultipartUpload(bucket, object string, metadata map[st
|
||||
return "", traceError(errors.New("Upload ID name collision"))
|
||||
}
|
||||
metadataObject := getAzureMetadataObjectName(object, uploadID)
|
||||
metadata = s3ToAzureHeaders(metadata)
|
||||
|
||||
var jsonData []byte
|
||||
if jsonData, err = json.Marshal(azureMultipartMetadata{Name: object, Metadata: metadata}); err != nil {
|
||||
return "", traceError(err)
|
||||
}
|
||||
|
||||
err = a.client.CreateBlockBlobFromReader(bucket, metadataObject, uint64(len(jsonData)), bytes.NewBuffer(jsonData), nil)
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject)
|
||||
err = blob.CreateBlockBlobFromReader(bytes.NewBuffer(jsonData), nil)
|
||||
if err != nil {
|
||||
return "", azureToObjectError(traceError(err), bucket, metadataObject)
|
||||
}
|
||||
@@ -579,14 +639,18 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int
|
||||
}
|
||||
|
||||
id := azureGetBlockID(partID, subPartNumber, uploadID, etag)
|
||||
err = a.client.PutBlockWithLength(bucket, object, id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil)
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil)
|
||||
if err != nil {
|
||||
return info, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
subPartNumber++
|
||||
}
|
||||
if err = data.Verify(); err != nil {
|
||||
a.client.DeleteBlob(bucket, object, nil)
|
||||
errorIf(err, "Verification of uploaded object data failed against client provided checksums.")
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
derr := blob.Delete(nil)
|
||||
errorIf(derr, "Failed to delete blob when cleaning up a bad blob upload.")
|
||||
return info, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
|
||||
@@ -615,7 +679,9 @@ func (a *azureObjects) AbortMultipartUpload(bucket, object, uploadID string) (er
|
||||
return err
|
||||
}
|
||||
|
||||
return a.client.DeleteBlob(bucket, getAzureMetadataObjectName(object, uploadID), nil)
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(
|
||||
getAzureMetadataObjectName(object, uploadID))
|
||||
return blob.Delete(nil)
|
||||
}
|
||||
|
||||
// CompleteMultipartUpload - Use Azure equivalent PutBlockList.
|
||||
@@ -630,7 +696,8 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
|
||||
}
|
||||
|
||||
var metadataReader io.Reader
|
||||
if metadataReader, err = a.client.GetBlob(bucket, metadataObject); err != nil {
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject)
|
||||
if metadataReader, err = blob.Get(nil); err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, metadataObject)
|
||||
}
|
||||
|
||||
@@ -639,17 +706,18 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, metadataObject)
|
||||
}
|
||||
|
||||
meta := metadata.Metadata
|
||||
defer func() {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
derr := a.client.DeleteBlob(bucket, metadataObject, nil)
|
||||
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject)
|
||||
derr := blob.Delete(nil)
|
||||
errorIf(derr, "unable to remove meta data object for upload ID %s", uploadID)
|
||||
}()
|
||||
|
||||
resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted)
|
||||
objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object)
|
||||
resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
@@ -705,23 +773,17 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
|
||||
}
|
||||
}
|
||||
|
||||
err = a.client.PutBlockList(bucket, object, allBlocks)
|
||||
err = objBlob.PutBlockList(allBlocks, nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
if len(meta) > 0 {
|
||||
prop := storage.BlobHeaders{
|
||||
ContentMD5: meta["Content-Md5"],
|
||||
ContentLanguage: meta["Content-Language"],
|
||||
ContentEncoding: meta["Content-Encoding"],
|
||||
ContentType: meta["Content-Type"],
|
||||
CacheControl: meta["Cache-Control"],
|
||||
}
|
||||
err = a.client.SetBlobProperties(bucket, object, prop)
|
||||
if len(metadata.Metadata) > 0 {
|
||||
objBlob.Metadata, objBlob.Properties = s3MetaToAzureProperties(metadata.Metadata)
|
||||
err = objBlob.SetProperties(nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
err = a.client.SetBlobMetadata(bucket, object, nil, meta)
|
||||
err = objBlob.SetMetadata(nil)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(traceError(err), bucket, object)
|
||||
}
|
||||
@@ -729,32 +791,6 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
|
||||
return a.GetObjectInfo(bucket, object)
|
||||
}
|
||||
|
||||
// Copied from github.com/Azure/azure-sdk-for-go/storage/blob.go
|
||||
func azureListBlobsGetParameters(p storage.ListBlobsParameters) url.Values {
|
||||
out := url.Values{}
|
||||
|
||||
if p.Prefix != "" {
|
||||
out.Set("prefix", p.Prefix)
|
||||
}
|
||||
if p.Delimiter != "" {
|
||||
out.Set("delimiter", p.Delimiter)
|
||||
}
|
||||
if p.Marker != "" {
|
||||
out.Set("marker", p.Marker)
|
||||
}
|
||||
if p.Include != "" {
|
||||
out.Set("include", p.Include)
|
||||
}
|
||||
if p.MaxResults != 0 {
|
||||
out.Set("maxresults", fmt.Sprintf("%v", p.MaxResults))
|
||||
}
|
||||
if p.Timeout != 0 {
|
||||
out.Set("timeout", fmt.Sprintf("%v", p.Timeout))
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// SetBucketPolicies - Azure supports three types of container policies:
|
||||
// storage.ContainerAccessTypeContainer - readonly in minio terminology
|
||||
// storage.ContainerAccessTypeBlob - readonly without listing in minio terminology
|
||||
@@ -784,14 +820,16 @@ func (a *azureObjects) SetBucketPolicies(bucket string, policyInfo policy.Bucket
|
||||
AccessType: storage.ContainerAccessTypeContainer,
|
||||
AccessPolicies: nil,
|
||||
}
|
||||
err := a.client.SetContainerPermissions(bucket, perm, 0, "")
|
||||
container := a.client.GetContainerReference(bucket)
|
||||
err := container.SetPermissions(perm, nil)
|
||||
return azureToObjectError(traceError(err), bucket)
|
||||
}
|
||||
|
||||
// GetBucketPolicies - Get the container ACL and convert it to canonical []bucketAccessPolicy
|
||||
func (a *azureObjects) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) {
|
||||
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
|
||||
perm, err := a.client.GetContainerPermissions(bucket, 0, "")
|
||||
container := a.client.GetContainerReference(bucket)
|
||||
perm, err := container.GetPermissions(nil)
|
||||
if err != nil {
|
||||
return policy.BucketAccessPolicy{}, azureToObjectError(traceError(err), bucket)
|
||||
}
|
||||
@@ -812,6 +850,7 @@ func (a *azureObjects) DeleteBucketPolicies(bucket string) error {
|
||||
AccessType: storage.ContainerAccessTypePrivate,
|
||||
AccessPolicies: nil,
|
||||
}
|
||||
err := a.client.SetContainerPermissions(bucket, perm, 0, "")
|
||||
container := a.client.GetContainerReference(bucket)
|
||||
err := container.SetPermissions(perm, nil)
|
||||
return azureToObjectError(traceError(err))
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func TestAzureToS3ETag(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test canonical metadata.
|
||||
func TestS3ToAzureHeaders(t *testing.T) {
|
||||
func TestS3MetaToAzureProperties(t *testing.T) {
|
||||
headers := map[string]string{
|
||||
"accept-encoding": "gzip",
|
||||
"content-encoding": "gzip",
|
||||
@@ -52,21 +52,21 @@ func TestS3ToAzureHeaders(t *testing.T) {
|
||||
"X-Amz-Meta-X-Amz-Matdesc": "{}",
|
||||
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
}
|
||||
// Only X-Amz-Meta- prefixed entries will be returned in
|
||||
// Metadata (without the prefix!)
|
||||
expectedHeaders := map[string]string{
|
||||
"Accept-Encoding": "gzip",
|
||||
"Content-Encoding": "gzip",
|
||||
"X-Ms-Meta-Hdr": "value",
|
||||
"X-Ms-Meta-x_minio_key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
|
||||
"X-Ms-Meta-x_minio_matdesc": "{}",
|
||||
"X-Ms-Meta-x_minio_iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
"Hdr": "value",
|
||||
"x_minio_key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
|
||||
"x_minio_matdesc": "{}",
|
||||
"x_minio_iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
}
|
||||
actualHeaders := s3ToAzureHeaders(headers)
|
||||
if !reflect.DeepEqual(actualHeaders, expectedHeaders) {
|
||||
t.Fatalf("Test failed, expected %#v, got %#v", expectedHeaders, actualHeaders)
|
||||
meta, _ := s3MetaToAzureProperties(headers)
|
||||
if !reflect.DeepEqual(map[string]string(meta), expectedHeaders) {
|
||||
t.Fatalf("Test failed, expected %#v, got %#v", expectedHeaders, meta)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAzureToS3Metadata(t *testing.T) {
|
||||
func TestAzurePropertiesToS3Meta(t *testing.T) {
|
||||
// Just one testcase. Adding more test cases does not add value to the testcase
|
||||
// as azureToS3Metadata() just adds a prefix.
|
||||
metadata := map[string]string{
|
||||
@@ -81,7 +81,7 @@ func TestAzureToS3Metadata(t *testing.T) {
|
||||
"X-Amz-Meta-X-Amz-Matdesc": "{}",
|
||||
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
|
||||
}
|
||||
actualMeta := azureToS3Metadata(metadata)
|
||||
actualMeta := azurePropertiesToS3Meta(metadata, storage.BlobProperties{})
|
||||
if !reflect.DeepEqual(actualMeta, expectedMeta) {
|
||||
t.Fatalf("Test failed, expected %#v, got %#v", expectedMeta, actualMeta)
|
||||
}
|
||||
@@ -207,16 +207,30 @@ func TestAzureListBlobsGetParameters(t *testing.T) {
|
||||
expectedURLValues.Set("prefix", "test")
|
||||
expectedURLValues.Set("delimiter", "_")
|
||||
expectedURLValues.Set("marker", "marker")
|
||||
expectedURLValues.Set("include", "hello")
|
||||
expectedURLValues.Set("include", "metadata")
|
||||
expectedURLValues.Set("maxresults", "20")
|
||||
expectedURLValues.Set("timeout", "10")
|
||||
|
||||
setBlobParameters := storage.ListBlobsParameters{"test", "_", "marker", "hello", 20, 10}
|
||||
setBlobParameters := storage.ListBlobsParameters{
|
||||
Prefix: "test",
|
||||
Delimiter: "_",
|
||||
Marker: "marker",
|
||||
Include: &storage.IncludeBlobDataset{Metadata: true},
|
||||
MaxResults: 20,
|
||||
Timeout: 10,
|
||||
}
|
||||
|
||||
// Test values set 2
|
||||
expectedURLValues1 := url.Values{}
|
||||
|
||||
setBlobParameters1 := storage.ListBlobsParameters{"", "", "", "", 0, 0}
|
||||
setBlobParameters1 := storage.ListBlobsParameters{
|
||||
Prefix: "",
|
||||
Delimiter: "",
|
||||
Marker: "",
|
||||
Include: nil,
|
||||
MaxResults: 0,
|
||||
Timeout: 0,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
||||
Reference in New Issue
Block a user