mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
update deps and update azure WARM tier implementation (#20247)
This commit is contained in:
@@ -28,7 +28,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/minio/minio/internal/ioutil"
|
||||
"google.golang.org/api/googleapi"
|
||||
|
||||
@@ -2549,11 +2549,11 @@ func toAPIError(ctx context.Context, err error) APIError {
|
||||
if len(e.Errors) >= 1 {
|
||||
apiErr.Code = e.Errors[0].Reason
|
||||
}
|
||||
case azblob.StorageError:
|
||||
case *azcore.ResponseError:
|
||||
apiErr = APIError{
|
||||
Code: string(e.ServiceCode()),
|
||||
Code: e.ErrorCode,
|
||||
Description: e.Error(),
|
||||
HTTPStatusCode: e.Response().StatusCode,
|
||||
HTTPStatusCode: e.StatusCode,
|
||||
}
|
||||
// Add more other SDK related errors here if any in future.
|
||||
default:
|
||||
|
||||
@@ -19,28 +19,38 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
"github.com/Azure/go-autorest/autorest/adal"
|
||||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
"github.com/minio/madmin-go/v3"
|
||||
)
|
||||
|
||||
type warmBackendAzure struct {
|
||||
serviceURL azblob.ServiceURL
|
||||
clnt *azblob.Client
|
||||
Bucket string
|
||||
Prefix string
|
||||
StorageClass string
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) tier() *blob.AccessTier {
|
||||
if az.StorageClass == "" {
|
||||
return nil
|
||||
}
|
||||
for _, t := range blob.PossibleAccessTierValues() {
|
||||
if strings.EqualFold(az.StorageClass, string(t)) {
|
||||
return &t
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) getDest(object string) string {
|
||||
destObj := object
|
||||
if az.Prefix != "" {
|
||||
@@ -49,150 +59,114 @@ func (az *warmBackendAzure) getDest(object string) string {
|
||||
return destObj
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) tier() azblob.AccessTierType {
|
||||
for _, t := range azblob.PossibleAccessTierTypeValues() {
|
||||
if strings.EqualFold(az.StorageClass, string(t)) {
|
||||
return t
|
||||
}
|
||||
}
|
||||
return azblob.AccessTierType("")
|
||||
}
|
||||
|
||||
// FIXME: add support for remote version ID in Azure remote tier and remove
|
||||
// this. Currently it's a no-op.
|
||||
|
||||
func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
|
||||
blobURL := az.serviceURL.NewContainerURL(az.Bucket).NewBlockBlobURL(az.getDest(object))
|
||||
// set tier if specified -
|
||||
if az.StorageClass != "" {
|
||||
if _, err := blobURL.SetTier(ctx, az.tier(), azblob.LeaseAccessConditions{}, azblob.RehydratePriorityStandard); err != nil {
|
||||
return "", azureToObjectError(err, az.Bucket, object)
|
||||
}
|
||||
}
|
||||
res, err := azblob.UploadStreamToBlockBlob(ctx, r, blobURL, azblob.UploadStreamToBlockBlobOptions{})
|
||||
resp, err := az.clnt.UploadStream(ctx, az.Bucket, az.getDest(object), io.LimitReader(r, length), &azblob.UploadStreamOptions{
|
||||
Concurrency: 4,
|
||||
AccessTier: az.tier(), // set tier if specified
|
||||
})
|
||||
if err != nil {
|
||||
return "", azureToObjectError(err, az.Bucket, object)
|
||||
return "", azureToObjectError(err, az.Bucket, az.getDest(object))
|
||||
}
|
||||
return remoteVersionID(res.Version()), nil
|
||||
vid := ""
|
||||
if resp.VersionID != nil {
|
||||
vid = *resp.VersionID
|
||||
}
|
||||
return remoteVersionID(vid), nil
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) {
|
||||
if opts.startOffset < 0 {
|
||||
return nil, InvalidRange{}
|
||||
}
|
||||
blobURL := az.serviceURL.NewContainerURL(az.Bucket).NewBlobURL(az.getDest(object))
|
||||
blob, err := blobURL.Download(ctx, opts.startOffset, opts.length, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
|
||||
resp, err := az.clnt.DownloadStream(ctx, az.Bucket, az.getDest(object), &azblob.DownloadStreamOptions{
|
||||
Range: blob.HTTPRange{Offset: opts.startOffset, Count: opts.length},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, azureToObjectError(err, az.Bucket, object)
|
||||
return nil, azureToObjectError(err, az.Bucket, az.getDest(object))
|
||||
}
|
||||
|
||||
rc := blob.Body(azblob.RetryReaderOptions{})
|
||||
return rc, nil
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) Remove(ctx context.Context, object string, rv remoteVersionID) error {
|
||||
blob := az.serviceURL.NewContainerURL(az.Bucket).NewBlobURL(az.getDest(object))
|
||||
_, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
|
||||
return azureToObjectError(err, az.Bucket, object)
|
||||
_, err := az.clnt.DeleteBlob(ctx, az.Bucket, az.getDest(object), &azblob.DeleteBlobOptions{})
|
||||
return azureToObjectError(err, az.Bucket, az.getDest(object))
|
||||
}
|
||||
|
||||
func (az *warmBackendAzure) InUse(ctx context.Context) (bool, error) {
|
||||
containerURL := az.serviceURL.NewContainerURL(az.Bucket)
|
||||
resp, err := containerURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, "/", azblob.ListBlobsSegmentOptions{
|
||||
Prefix: az.Prefix,
|
||||
MaxResults: int32(1),
|
||||
maxResults := int32(1)
|
||||
pager := az.clnt.NewListBlobsFlatPager(az.Bucket, &azblob.ListBlobsFlatOptions{
|
||||
Prefix: &az.Prefix,
|
||||
MaxResults: &maxResults,
|
||||
})
|
||||
if !pager.More() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
resp, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "no more pages") {
|
||||
return false, nil
|
||||
}
|
||||
return false, azureToObjectError(err, az.Bucket, az.Prefix)
|
||||
}
|
||||
if len(resp.Segment.BlobPrefixes) > 0 || len(resp.Segment.BlobItems) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
|
||||
return len(resp.Segment.BlobItems) > 0, nil
|
||||
}
|
||||
|
||||
func newCredentialFromSP(conf madmin.TierAzure) (azblob.Credential, error) {
|
||||
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, conf.SPAuth.TenantID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
spt, err := adal.NewServicePrincipalToken(*oauthConfig, conf.SPAuth.ClientID, conf.SPAuth.ClientSecret, azure.PublicCloud.ResourceIdentifiers.Storage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Refresh obtains a fresh token
|
||||
err = spt.Refresh()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc := azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration {
|
||||
err := spt.Refresh()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
// set the new token value
|
||||
tc.SetToken(spt.Token().AccessToken)
|
||||
|
||||
// get the next token before the current one expires
|
||||
nextRenewal := float64(time.Until(spt.Token().Expires())) * 0.8
|
||||
if nextRenewal <= 0 {
|
||||
nextRenewal = float64(time.Second)
|
||||
}
|
||||
|
||||
return time.Duration(nextRenewal)
|
||||
})
|
||||
|
||||
return tc, nil
|
||||
type azureConf struct {
|
||||
madmin.TierAzure
|
||||
}
|
||||
|
||||
func newWarmBackendAzure(conf madmin.TierAzure, _ string) (*warmBackendAzure, error) {
|
||||
var (
|
||||
credential azblob.Credential
|
||||
err error
|
||||
)
|
||||
|
||||
func (conf azureConf) Validate() error {
|
||||
switch {
|
||||
case conf.AccountName == "":
|
||||
return nil, errors.New("the account name is required")
|
||||
return errors.New("the account name is required")
|
||||
case conf.AccountKey != "" && (conf.SPAuth.TenantID != "" || conf.SPAuth.ClientID != "" || conf.SPAuth.ClientSecret != ""):
|
||||
return nil, errors.New("multiple authentication mechanisms are provided")
|
||||
return errors.New("multiple authentication mechanisms are provided")
|
||||
case conf.AccountKey == "" && (conf.SPAuth.TenantID == "" || conf.SPAuth.ClientID == "" || conf.SPAuth.ClientSecret == ""):
|
||||
return nil, errors.New("no authentication mechanism was provided")
|
||||
return errors.New("no authentication mechanism was provided")
|
||||
}
|
||||
|
||||
if conf.Bucket == "" {
|
||||
return nil, errors.New("no bucket name was provided")
|
||||
return errors.New("no bucket name was provided")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (conf azureConf) NewClient() (clnt *azblob.Client, clntErr error) {
|
||||
if err := conf.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ep := conf.Endpoint
|
||||
if ep == "" {
|
||||
ep = fmt.Sprintf("https://%s.blob.core.windows.net", conf.AccountName)
|
||||
}
|
||||
|
||||
if conf.IsSPEnabled() {
|
||||
credential, err = newCredentialFromSP(conf)
|
||||
} else {
|
||||
credential, err = azblob.NewSharedKeyCredential(conf.AccountName, conf.AccountKey)
|
||||
}
|
||||
if err != nil {
|
||||
if _, ok := err.(base64.CorruptInputError); ok {
|
||||
return nil, errors.New("invalid Azure credentials")
|
||||
credential, err := azidentity.NewClientSecretCredential(conf.SPAuth.TenantID, conf.SPAuth.ClientID, conf.SPAuth.ClientSecret, &azidentity.ClientSecretCredentialOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return azblob.NewClient(ep, credential, &azblob.ClientOptions{})
|
||||
}
|
||||
credential, err := azblob.NewSharedKeyCredential(conf.AccountName, conf.AccountKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
|
||||
var u *url.URL
|
||||
if conf.Endpoint != "" {
|
||||
u, err = url.Parse(conf.Endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
u, err = url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", conf.AccountName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return azblob.NewClientWithSharedKeyCredential(ep, credential, &azblob.ClientOptions{})
|
||||
}
|
||||
|
||||
func newWarmBackendAzure(conf madmin.TierAzure, _ string) (*warmBackendAzure, error) {
|
||||
clnt, err := azureConf{conf}.NewClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serviceURL := azblob.NewServiceURL(*u, p)
|
||||
|
||||
return &warmBackendAzure{
|
||||
serviceURL: serviceURL,
|
||||
clnt: clnt,
|
||||
Bucket: conf.Bucket,
|
||||
Prefix: strings.TrimSuffix(conf.Prefix, slashSeparator),
|
||||
StorageClass: conf.StorageClass,
|
||||
@@ -214,15 +188,15 @@ func azureToObjectError(err error, params ...string) error {
|
||||
object = params[1]
|
||||
}
|
||||
|
||||
azureErr, ok := err.(azblob.StorageError)
|
||||
azureErr, ok := err.(*azcore.ResponseError)
|
||||
if !ok {
|
||||
// We don't interpret non Azure errors. As azure errors will
|
||||
// have StatusCode to help to convert to object errors.
|
||||
return err
|
||||
}
|
||||
|
||||
serviceCode := string(azureErr.ServiceCode())
|
||||
statusCode := azureErr.Response().StatusCode
|
||||
serviceCode := azureErr.ErrorCode
|
||||
statusCode := azureErr.StatusCode
|
||||
|
||||
return azureCodesToObjectError(err, serviceCode, statusCode, bucket, object)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user