From e2579b1f5a234eb695b33da879bc55ba1f6b98b5 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Tue, 12 Jan 2021 07:48:09 +0100 Subject: [PATCH] azure: Use default upload parameters to avoid consuming too much memory (#11251) A lot of memory is consumed when uploading small files in parallel, use the default upload parameters and add MINIO_AZURE_UPLOAD_CONCURRENCY for users to tweak. --- cmd/gateway/azure/gateway-azure.go | 76 ++++++++++--------------- cmd/gateway/azure/gateway-azure_test.go | 37 ------------ pkg/env/env.go | 11 ++++ 3 files changed, 40 insertions(+), 84 deletions(-) diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 7d69ded46..d9f652caf 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -51,30 +51,19 @@ import ( minio "github.com/minio/minio/cmd" ) -var ( - azureUploadChunkSize = getUploadChunkSizeFromEnv(azureChunkSizeEnvVar, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte)) - azureSdkTimeout = time.Duration(azureUploadChunkSize/humanize.MiByte) * azureSdkTimeoutPerMb - azureUploadConcurrency = azureUploadMaxMemoryUsage / azureUploadChunkSize +const ( + azureDefaultUploadChunkSizeMB = 25 + azureDownloadRetryAttempts = 5 + azureS3MinPartSize = 5 * humanize.MiByte + metadataObjectNameTemplate = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x/azure.json" + azureMarkerPrefix = "{minio}" + metadataPartNamePrefix = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x" + maxPartsCount = 10000 ) -const ( - // The defaultDialTimeout for communicating with the cloud backends is set - // to 30 seconds in utils.go; the Azure SDK recommends to set a timeout of 60 - // seconds per MB of data a client expects to upload so we must transfer less - // than 0.5 MB per chunk to stay within the defaultDialTimeout tolerance. - // See https://github.com/Azure/azure-storage-blob-go/blob/fc70003/azblob/zc_policy_retry.go#L39-L44 for more details. - // To change the upload chunk size, set the environmental variable MINIO_AZURE_CHUNK_SIZE_MB with a (float) value between 0 and 100 - azureDefaultUploadChunkSize = 25 * humanize.MiByte - azureSdkTimeoutPerMb = 60 * time.Second - azureUploadMaxMemoryUsage = 100 * humanize.MiByte - azureChunkSizeEnvVar = "MINIO_AZURE_CHUNK_SIZE_MB" - - azureDownloadRetryAttempts = 5 - azureS3MinPartSize = 5 * humanize.MiByte - metadataObjectNameTemplate = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x/azure.json" - azureMarkerPrefix = "{minio}" - metadataPartNamePrefix = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x" - maxPartsCount = 10000 +var ( + azureUploadChunkSize int + azureUploadConcurrency int ) func init() { @@ -94,7 +83,6 @@ EXAMPLES: 1. Start minio gateway server for Azure Blob Storage backend on custom endpoint. {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}azureaccountname {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}azureaccountkey - {{.Prompt}} {{.EnvVarSetCommand}} MINIO_AZURE_CHUNK_SIZE_MB {{.AssignmentOperator}}0.25 {{.Prompt}} {{.HelpName}} https://azureaccountname.blob.custom.azure.endpoint 2. Start minio gateway server for Azure Blob Storage backend with edge caching enabled. @@ -106,8 +94,8 @@ EXAMPLES: {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85 - {{.Prompt}} {{.EnvVarSetCommand}} MINIO_AZURE_CHUNK_SIZE_MB {{.AssignmentOperator}}25 {{.Prompt}} {{.HelpName}} + ` minio.RegisterGatewayCommand(cli.Command{ @@ -140,27 +128,6 @@ func azureGatewayMain(ctx *cli.Context) { minio.StartGateway(ctx, &Azure{host}) } -// getUploadChunkSizeFromEnv returns the parsed chunk size from the environmental variable 'MINIO_AZURE_CHUNK_SIZE_MB' -// The environmental variable should be a floating point number between 0 and 100 representing the MegaBytes -// The returned value is an int representing the size in bytes -func getUploadChunkSizeFromEnv(envvar string, defaultValue string) int { - envChunkSize := env.Get(envvar, defaultValue) - - i, err := strconv.ParseFloat(envChunkSize, 64) - if err != nil { - logger.LogIf(context.Background(), err) - return azureDefaultUploadChunkSize - } - - if i <= 0 || i > 100 { - logger.LogIf(context.Background(), fmt.Errorf("ENV '%v' should be a floating point value between 0 and 100.\n"+ - "The upload chunk size is set to its default: %s\n", azureChunkSizeEnvVar, defaultValue)) - return azureDefaultUploadChunkSize - } - - return int(i * humanize.MiByte) -} - // Azure implements Gateway. type Azure struct { host string @@ -188,6 +155,20 @@ func (g *Azure) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, erro return nil, err } + azureUploadChunkSize, err = env.GetInt("MINIO_AZURE_CHUNK_SIZE_MB", azureDefaultUploadChunkSizeMB) + if err != nil { + return nil, err + } + azureUploadChunkSize *= humanize.MiByte + if azureUploadChunkSize <= 0 || azureUploadChunkSize > 100*humanize.MiByte { + return nil, fmt.Errorf("MINIO_AZURE_CHUNK_SIZE_MB should be an integer value between 0 and 100") + } + + azureUploadConcurrency, err = env.GetInt("MINIO_AZURE_UPLOAD_CONCURRENCY", 4) + if err != nil { + return nil, err + } + credential, err := azblob.NewSharedKeyCredential(creds.AccessKey, creds.SecretKey) if err != nil { if _, ok := err.(base64.CorruptInputError); ok { @@ -208,7 +189,9 @@ func (g *Azure) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, erro pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{ Retry: azblob.RetryOptions{ - TryTimeout: azureSdkTimeout, + // Azure SDK recommends to set a timeout of 60 seconds per MB of data so we + // calculate here the timeout for the configured upload chunck size. + TryTimeout: time.Duration(azureUploadChunkSize/humanize.MiByte) * 60 * time.Second, }, HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { @@ -914,7 +897,6 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r * blobURL := a.client.NewContainerURL(bucket).NewBlockBlobURL(object) _, err = azblob.UploadStreamToBlockBlob(ctx, data, blobURL, azblob.UploadStreamToBlockBlobOptions{ - BufferSize: azureUploadChunkSize, MaxBuffers: azureUploadConcurrency, BlobHTTPHeaders: properties, Metadata: metadata, diff --git a/cmd/gateway/azure/gateway-azure_test.go b/cmd/gateway/azure/gateway-azure_test.go index 3fd55e2df..7c3987ac9 100644 --- a/cmd/gateway/azure/gateway-azure_test.go +++ b/cmd/gateway/azure/gateway-azure_test.go @@ -20,13 +20,9 @@ import ( "encoding/base64" "fmt" "net/http" - "os" "reflect" - "strconv" "testing" - "github.com/dustin/go-humanize" - "github.com/Azure/azure-storage-blob-go/azblob" minio "github.com/minio/minio/cmd" ) @@ -268,36 +264,3 @@ func TestCheckAzureUploadID(t *testing.T) { } } } - -func TestParsingUploadChunkSize(t *testing.T) { - key := "MINIO_AZURE_CHUNK_SIZE_MB" - invalidValues := []string{ - "", - "0,3", - "100.1", - "-1", - } - - for i, chunkValue := range invalidValues { - os.Setenv(key, chunkValue) - result := getUploadChunkSizeFromEnv(key, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte)) - if result != azureDefaultUploadChunkSize { - t.Errorf("Test %d: expected: %d, got: %d", i+1, azureDefaultUploadChunkSize, result) - } - } - - validValues := []string{ - "1", - "1.25", - "50", - "99", - } - for i, chunkValue := range validValues { - os.Setenv(key, chunkValue) - result := getUploadChunkSizeFromEnv(key, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte)) - if result == azureDefaultUploadChunkSize { - t.Errorf("Test %d: expected: %d, got: %d", i+1, azureDefaultUploadChunkSize, result) - } - } - -} diff --git a/pkg/env/env.go b/pkg/env/env.go index 6210dab53..86ae8e05e 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -18,6 +18,7 @@ package env import ( + "strconv" "strings" "sync" ) @@ -75,6 +76,16 @@ func Get(key, defaultValue string) string { return defaultValue } +// GetInt returns an integer if found in the environment +// and returns the default value otherwise. +func GetInt(key string, defaultValue int) (int, error) { + v := Get(key, "") + if v == "" { + return defaultValue, nil + } + return strconv.Atoi(v) +} + // List all envs with a given prefix. func List(prefix string) (envs []string) { for _, env := range Environ() {