add support for configurable remote transport deadline (#10447)

configurable remote transport timeouts for some special cases
where this value needs to be bumped to a higher value when
transferring large data between federated instances.
This commit is contained in:
Harshavardhana 2020-09-11 23:03:08 -07:00 committed by GitHub
parent bda0fe3150
commit f355374962
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 61 additions and 56 deletions

View File

@ -398,6 +398,11 @@ func lookupConfigs(s config.Config, setDriveCount int) {
globalAPIConfig.init(apiConfig, setDriveCount) globalAPIConfig.init(apiConfig, setDriveCount)
// Initialize remote instance transport once.
getRemoteInstanceTransportOnce.Do(func() {
getRemoteInstanceTransport = newGatewayHTTPTransport(apiConfig.RemoteTransportDeadline)
})
if globalIsErasure { if globalIsErasure {
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount) globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount)
if err != nil { if err != nil {

View File

@ -33,11 +33,13 @@ const (
apiRequestsDeadline = "requests_deadline" apiRequestsDeadline = "requests_deadline"
apiReadyDeadline = "ready_deadline" apiReadyDeadline = "ready_deadline"
apiCorsAllowOrigin = "cors_allow_origin" apiCorsAllowOrigin = "cors_allow_origin"
apiRemoteTransportDeadline = "remote_transport_deadline"
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
EnvAPIReadyDeadline = "MINIO_API_READY_DEADLINE" EnvAPIReadyDeadline = "MINIO_API_READY_DEADLINE"
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
) )
// DefaultKVS - default storage class config // DefaultKVS - default storage class config
@ -59,15 +61,20 @@ var (
Key: apiCorsAllowOrigin, Key: apiCorsAllowOrigin,
Value: "*", Value: "*",
}, },
config.KV{
Key: apiRemoteTransportDeadline,
Value: "2h",
},
} }
) )
// Config storage class configuration // Config storage class configuration
type Config struct { type Config struct {
APIRequestsMax int `json:"requests_max"` RequestsMax int `json:"requests_max"`
APIRequestsDeadline time.Duration `json:"requests_deadline"` RequestsDeadline time.Duration `json:"requests_deadline"`
APIReadyDeadline time.Duration `json:"ready_deadline"` ReadyDeadline time.Duration `json:"ready_deadline"`
APICorsAllowOrigin []string `json:"cors_allow_origin"` CorsAllowOrigin []string `json:"cors_allow_origin"`
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
} }
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. // UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
@ -108,10 +115,17 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
} }
corsAllowOrigin := strings.Split(env.Get(EnvAPICorsAllowOrigin, kvs.Get(apiCorsAllowOrigin)), ",") corsAllowOrigin := strings.Split(env.Get(EnvAPICorsAllowOrigin, kvs.Get(apiCorsAllowOrigin)), ",")
remoteTransportDeadline, err := time.ParseDuration(env.Get(EnvAPIRemoteTransportDeadline, kvs.Get(apiRemoteTransportDeadline)))
if err != nil {
return cfg, err
}
return Config{ return Config{
APIRequestsMax: requestsMax, RequestsMax: requestsMax,
APIRequestsDeadline: requestsDeadline, RequestsDeadline: requestsDeadline,
APIReadyDeadline: readyDeadline, ReadyDeadline: readyDeadline,
APICorsAllowOrigin: corsAllowOrigin, CorsAllowOrigin: corsAllowOrigin,
RemoteTransportDeadline: remoteTransportDeadline,
}, nil }, nil
} }

View File

@ -39,5 +39,11 @@ var (
Optional: true, Optional: true,
Type: "csv", Type: "csv",
}, },
config.HelpKV{
Key: apiRemoteTransportDeadline,
Description: `set the deadline for API requests on remote transports while proxying between federated instances e.g. "2h"`,
Optional: true,
Type: "duration",
},
} }
) )

View File

@ -38,10 +38,11 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
t.readyDeadline = cfg.APIReadyDeadline t.readyDeadline = cfg.ReadyDeadline
t.corsAllowOrigins = cfg.APICorsAllowOrigin t.corsAllowOrigins = cfg.CorsAllowOrigin
var apiRequestsMaxPerNode int var apiRequestsMaxPerNode int
if cfg.APIRequestsMax <= 0 { if cfg.RequestsMax <= 0 {
stats, err := sys.GetStats() stats, err := sys.GetStats()
if err != nil { if err != nil {
return return
@ -51,21 +52,23 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
// ram_per_request is 4MiB * setDriveCount + 2 * 10MiB (default erasure block size) // ram_per_request is 4MiB * setDriveCount + 2 * 10MiB (default erasure block size)
apiRequestsMaxPerNode = int(stats.TotalRAM / uint64(setDriveCount*readBlockSize+blockSizeV1*2)) apiRequestsMaxPerNode = int(stats.TotalRAM / uint64(setDriveCount*readBlockSize+blockSizeV1*2))
} else { } else {
apiRequestsMaxPerNode = cfg.APIRequestsMax apiRequestsMaxPerNode = cfg.RequestsMax
if len(globalEndpoints.Hostnames()) > 0 { if len(globalEndpoints.Hostnames()) > 0 {
apiRequestsMaxPerNode /= len(globalEndpoints.Hostnames()) apiRequestsMaxPerNode /= len(globalEndpoints.Hostnames())
} }
} }
t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode) t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode)
t.requestsDeadline = cfg.APIRequestsDeadline t.requestsDeadline = cfg.RequestsDeadline
} }
func (t *apiConfig) getCorsAllowOrigins() []string { func (t *apiConfig) getCorsAllowOrigins() []string {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
return t.corsAllowOrigins corsAllowOrigins := make([]string, len(t.corsAllowOrigins))
copy(corsAllowOrigins, t.corsAllowOrigins)
return corsAllowOrigins
} }
func (t *apiConfig) getReadyDeadline() time.Duration { func (t *apiConfig) getReadyDeadline() time.Duration {

View File

@ -711,17 +711,17 @@ func getCpObjMetadataFromHeader(ctx context.Context, r *http.Request, userMeta m
} }
// getRemoteInstanceTransport contains a singleton roundtripper. // getRemoteInstanceTransport contains a singleton roundtripper.
var getRemoteInstanceTransport *http.Transport var (
var getRemoteInstanceTransportLongTO *http.Transport getRemoteInstanceTransport *http.Transport
var getRemoteInstanceTransportOnce sync.Once getRemoteInstanceTransportOnce sync.Once
)
// Returns a minio-go Client configured to access remote host described by destDNSRecord // Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment // Applicable only in a federated deployment
var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core, error) { var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core, error) {
getRemoteInstanceTransportOnce.Do(func() { if newObjectLayerFn() == nil {
getRemoteInstanceTransport = NewGatewayHTTPTransport() return nil, errServerNotInitialized
getRemoteInstanceTransportLongTO = newGatewayHTTPTransport(time.Hour) }
})
cred := getReqAccessCred(r, globalServerRegion) cred := getReqAccessCred(r, globalServerRegion)
// In a federated deployment, all the instances share config files // In a federated deployment, all the instances share config files
@ -737,29 +737,6 @@ var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core,
return core, nil return core, nil
} }
// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment.
// The transport does not contain any timeout except for dialing.
func getRemoteInstanceClientLongTimeout(r *http.Request, host string) (*miniogo.Core, error) {
getRemoteInstanceTransportOnce.Do(func() {
getRemoteInstanceTransport = NewGatewayHTTPTransport()
getRemoteInstanceTransportLongTO = newGatewayHTTPTransport(time.Hour)
})
cred := getReqAccessCred(r, globalServerRegion)
// In a federated deployment, all the instances share config files
// and hence expected to have same credentials.
core, err := miniogo.NewCore(host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, ""),
Secure: globalIsSSL,
Transport: getRemoteInstanceTransportLongTO,
})
if err != nil {
return nil, err
}
return core, nil
}
// Check if the destination bucket is on a remote site, this code only gets executed // Check if the destination bucket is on a remote site, this code only gets executed
// when federation is enabled, ie when globalDNSConfig is non 'nil'. // when federation is enabled, ie when globalDNSConfig is non 'nil'.
// //
@ -1222,7 +1199,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
} }
// Send PutObject request to appropriate instance (in federated deployment) // Send PutObject request to appropriate instance (in federated deployment)
core, rerr := getRemoteInstanceClientLongTimeout(r, getHostFromSrv(dstRecords)) core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
if rerr != nil { if rerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL, guessIsBrowserReq(r))
return return
@ -1932,7 +1909,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
} }
// Send PutObject request to appropriate instance (in federated deployment) // Send PutObject request to appropriate instance (in federated deployment)
core, rerr := getRemoteInstanceClientLongTimeout(r, getHostFromSrv(dstRecords)) core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
if rerr != nil { if rerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL, guessIsBrowserReq(r))
return return

View File

@ -427,7 +427,7 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r
} }
return toJSONError(ctx, err, args.BucketName) return toJSONError(ctx, err, args.BucketName)
} }
core, err := getRemoteInstanceClientLongTimeout(r, getHostFromSrv(sr)) core, err := getRemoteInstanceClient(r, getHostFromSrv(sr))
if err != nil { if err != nil {
return toJSONError(ctx, err, args.BucketName) return toJSONError(ctx, err, args.BucketName)
} }
@ -653,7 +653,7 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
} }
return toJSONError(ctx, err, args.BucketName) return toJSONError(ctx, err, args.BucketName)
} }
core, err := getRemoteInstanceClientLongTimeout(r, getHostFromSrv(sr)) core, err := getRemoteInstanceClient(r, getHostFromSrv(sr))
if err != nil { if err != nil {
return toJSONError(ctx, err, args.BucketName) return toJSONError(ctx, err, args.BucketName)
} }