mirror of https://github.com/minio/minio.git
azure: Use default upload parameters to avoid consuming too much memory (#11251)
A lot of memory is consumed when uploading small files in parallel, use the default upload parameters and add MINIO_AZURE_UPLOAD_CONCURRENCY for users to tweak.
This commit is contained in:
parent
7824e19d20
commit
e2579b1f5a
|
@ -51,30 +51,19 @@ import (
|
|||
minio "github.com/minio/minio/cmd"
|
||||
)
|
||||
|
||||
var (
|
||||
azureUploadChunkSize = getUploadChunkSizeFromEnv(azureChunkSizeEnvVar, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte))
|
||||
azureSdkTimeout = time.Duration(azureUploadChunkSize/humanize.MiByte) * azureSdkTimeoutPerMb
|
||||
azureUploadConcurrency = azureUploadMaxMemoryUsage / azureUploadChunkSize
|
||||
const (
|
||||
azureDefaultUploadChunkSizeMB = 25
|
||||
azureDownloadRetryAttempts = 5
|
||||
azureS3MinPartSize = 5 * humanize.MiByte
|
||||
metadataObjectNameTemplate = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x/azure.json"
|
||||
azureMarkerPrefix = "{minio}"
|
||||
metadataPartNamePrefix = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x"
|
||||
maxPartsCount = 10000
|
||||
)
|
||||
|
||||
const (
|
||||
// The defaultDialTimeout for communicating with the cloud backends is set
|
||||
// to 30 seconds in utils.go; the Azure SDK recommends to set a timeout of 60
|
||||
// seconds per MB of data a client expects to upload so we must transfer less
|
||||
// than 0.5 MB per chunk to stay within the defaultDialTimeout tolerance.
|
||||
// See https://github.com/Azure/azure-storage-blob-go/blob/fc70003/azblob/zc_policy_retry.go#L39-L44 for more details.
|
||||
// To change the upload chunk size, set the environmental variable MINIO_AZURE_CHUNK_SIZE_MB with a (float) value between 0 and 100
|
||||
azureDefaultUploadChunkSize = 25 * humanize.MiByte
|
||||
azureSdkTimeoutPerMb = 60 * time.Second
|
||||
azureUploadMaxMemoryUsage = 100 * humanize.MiByte
|
||||
azureChunkSizeEnvVar = "MINIO_AZURE_CHUNK_SIZE_MB"
|
||||
|
||||
azureDownloadRetryAttempts = 5
|
||||
azureS3MinPartSize = 5 * humanize.MiByte
|
||||
metadataObjectNameTemplate = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x/azure.json"
|
||||
azureMarkerPrefix = "{minio}"
|
||||
metadataPartNamePrefix = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x"
|
||||
maxPartsCount = 10000
|
||||
var (
|
||||
azureUploadChunkSize int
|
||||
azureUploadConcurrency int
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -94,7 +83,6 @@ EXAMPLES:
|
|||
1. Start minio gateway server for Azure Blob Storage backend on custom endpoint.
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}azureaccountname
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}azureaccountkey
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_AZURE_CHUNK_SIZE_MB {{.AssignmentOperator}}0.25
|
||||
{{.Prompt}} {{.HelpName}} https://azureaccountname.blob.custom.azure.endpoint
|
||||
|
||||
2. Start minio gateway server for Azure Blob Storage backend with edge caching enabled.
|
||||
|
@ -106,8 +94,8 @@ EXAMPLES:
|
|||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85
|
||||
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_AZURE_CHUNK_SIZE_MB {{.AssignmentOperator}}25
|
||||
{{.Prompt}} {{.HelpName}}
|
||||
|
||||
`
|
||||
|
||||
minio.RegisterGatewayCommand(cli.Command{
|
||||
|
@ -140,27 +128,6 @@ func azureGatewayMain(ctx *cli.Context) {
|
|||
minio.StartGateway(ctx, &Azure{host})
|
||||
}
|
||||
|
||||
// getUploadChunkSizeFromEnv returns the parsed chunk size from the environmental variable 'MINIO_AZURE_CHUNK_SIZE_MB'
|
||||
// The environmental variable should be a floating point number between 0 and 100 representing the MegaBytes
|
||||
// The returned value is an int representing the size in bytes
|
||||
func getUploadChunkSizeFromEnv(envvar string, defaultValue string) int {
|
||||
envChunkSize := env.Get(envvar, defaultValue)
|
||||
|
||||
i, err := strconv.ParseFloat(envChunkSize, 64)
|
||||
if err != nil {
|
||||
logger.LogIf(context.Background(), err)
|
||||
return azureDefaultUploadChunkSize
|
||||
}
|
||||
|
||||
if i <= 0 || i > 100 {
|
||||
logger.LogIf(context.Background(), fmt.Errorf("ENV '%v' should be a floating point value between 0 and 100.\n"+
|
||||
"The upload chunk size is set to its default: %s\n", azureChunkSizeEnvVar, defaultValue))
|
||||
return azureDefaultUploadChunkSize
|
||||
}
|
||||
|
||||
return int(i * humanize.MiByte)
|
||||
}
|
||||
|
||||
// Azure implements Gateway.
|
||||
type Azure struct {
|
||||
host string
|
||||
|
@ -188,6 +155,20 @@ func (g *Azure) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, erro
|
|||
return nil, err
|
||||
}
|
||||
|
||||
azureUploadChunkSize, err = env.GetInt("MINIO_AZURE_CHUNK_SIZE_MB", azureDefaultUploadChunkSizeMB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
azureUploadChunkSize *= humanize.MiByte
|
||||
if azureUploadChunkSize <= 0 || azureUploadChunkSize > 100*humanize.MiByte {
|
||||
return nil, fmt.Errorf("MINIO_AZURE_CHUNK_SIZE_MB should be an integer value between 0 and 100")
|
||||
}
|
||||
|
||||
azureUploadConcurrency, err = env.GetInt("MINIO_AZURE_UPLOAD_CONCURRENCY", 4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
credential, err := azblob.NewSharedKeyCredential(creds.AccessKey, creds.SecretKey)
|
||||
if err != nil {
|
||||
if _, ok := err.(base64.CorruptInputError); ok {
|
||||
|
@ -208,7 +189,9 @@ func (g *Azure) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, erro
|
|||
|
||||
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
|
||||
Retry: azblob.RetryOptions{
|
||||
TryTimeout: azureSdkTimeout,
|
||||
// Azure SDK recommends to set a timeout of 60 seconds per MB of data so we
|
||||
// calculate here the timeout for the configured upload chunck size.
|
||||
TryTimeout: time.Duration(azureUploadChunkSize/humanize.MiByte) * 60 * time.Second,
|
||||
},
|
||||
HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
|
||||
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||
|
@ -914,7 +897,6 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
|
|||
blobURL := a.client.NewContainerURL(bucket).NewBlockBlobURL(object)
|
||||
|
||||
_, err = azblob.UploadStreamToBlockBlob(ctx, data, blobURL, azblob.UploadStreamToBlockBlobOptions{
|
||||
BufferSize: azureUploadChunkSize,
|
||||
MaxBuffers: azureUploadConcurrency,
|
||||
BlobHTTPHeaders: properties,
|
||||
Metadata: metadata,
|
||||
|
|
|
@ -20,13 +20,9 @@ import (
|
|||
"encoding/base64"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
)
|
||||
|
@ -268,36 +264,3 @@ func TestCheckAzureUploadID(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsingUploadChunkSize(t *testing.T) {
|
||||
key := "MINIO_AZURE_CHUNK_SIZE_MB"
|
||||
invalidValues := []string{
|
||||
"",
|
||||
"0,3",
|
||||
"100.1",
|
||||
"-1",
|
||||
}
|
||||
|
||||
for i, chunkValue := range invalidValues {
|
||||
os.Setenv(key, chunkValue)
|
||||
result := getUploadChunkSizeFromEnv(key, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte))
|
||||
if result != azureDefaultUploadChunkSize {
|
||||
t.Errorf("Test %d: expected: %d, got: %d", i+1, azureDefaultUploadChunkSize, result)
|
||||
}
|
||||
}
|
||||
|
||||
validValues := []string{
|
||||
"1",
|
||||
"1.25",
|
||||
"50",
|
||||
"99",
|
||||
}
|
||||
for i, chunkValue := range validValues {
|
||||
os.Setenv(key, chunkValue)
|
||||
result := getUploadChunkSizeFromEnv(key, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte))
|
||||
if result == azureDefaultUploadChunkSize {
|
||||
t.Errorf("Test %d: expected: %d, got: %d", i+1, azureDefaultUploadChunkSize, result)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package env
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
@ -75,6 +76,16 @@ func Get(key, defaultValue string) string {
|
|||
return defaultValue
|
||||
}
|
||||
|
||||
// GetInt returns an integer if found in the environment
|
||||
// and returns the default value otherwise.
|
||||
func GetInt(key string, defaultValue int) (int, error) {
|
||||
v := Get(key, "")
|
||||
if v == "" {
|
||||
return defaultValue, nil
|
||||
}
|
||||
return strconv.Atoi(v)
|
||||
}
|
||||
|
||||
// List all envs with a given prefix.
|
||||
func List(prefix string) (envs []string) {
|
||||
for _, env := range Environ() {
|
||||
|
|
Loading…
Reference in New Issue