mirror of
https://github.com/minio/minio.git
synced 2025-02-09 12:48:08 -05:00
fix: proxy requests to honor global transport
* fix: proxy requests to honor global transport Load the globalProxyEndpoint properly also, currently, the proxy requests will fail silently for batch cancel even if the proxy fails; instead,d properly send the corresponding error back for such proxy failures if opted * pass the transport to the GetProxyEnpoints function --------- Co-authored-by: Praveen raj Mani <praveen@minio.io>
This commit is contained in:
parent
4a319bedc9
commit
712fe1a8df
@ -259,7 +259,7 @@ func (a adminAPIHandlers) RebalanceStart(w http.ResponseWriter, r *http.Request)
|
|||||||
if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
|
if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
|
||||||
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
||||||
if proxyEp.Endpoint.Host == ep.Host {
|
if proxyEp.Endpoint.Host == ep.Host {
|
||||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
|
if proxied, success := proxyRequestByNodeIndex(ctx, w, r, nodeIdx, false); proxied && success {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -330,7 +330,7 @@ func (a adminAPIHandlers) RebalanceStatus(w http.ResponseWriter, r *http.Request
|
|||||||
if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
|
if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
|
||||||
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
||||||
if proxyEp.Endpoint.Host == ep.Host {
|
if proxyEp.Endpoint.Host == ep.Host {
|
||||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
|
if proxied, success := proxyRequestByNodeIndex(ctx, w, r, nodeIdx, false); proxied && success {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -384,7 +384,7 @@ func proxyDecommissionRequest(ctx context.Context, defaultEndPoint Endpoint, w h
|
|||||||
}
|
}
|
||||||
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
for nodeIdx, proxyEp := range globalProxyEndpoints {
|
||||||
if proxyEp.Endpoint.Host == host && !proxyEp.IsLocal {
|
if proxyEp.Endpoint.Host == host && !proxyEp.IsLocal {
|
||||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
|
if proxied, success := proxyRequestByNodeIndex(ctx, w, r, nodeIdx, false); proxied && success {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1320,7 +1320,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Analyze the heal token and route the request accordingly
|
// Analyze the heal token and route the request accordingly
|
||||||
token, success := proxyRequestByToken(ctx, w, r, hip.clientToken)
|
token, _, success := proxyRequestByToken(ctx, w, r, hip.clientToken, false)
|
||||||
if success {
|
if success {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -445,14 +445,14 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo
|
|||||||
oiCache.Add(od, &exp.ObjectInfo)
|
oiCache.Add(od, &exp.ObjectInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
var done bool
|
|
||||||
// DeleteObject(deletePrefix: true) to expire all versions of an object
|
// DeleteObject(deletePrefix: true) to expire all versions of an object
|
||||||
for _, exp := range toExpireAll {
|
for _, exp := range toExpireAll {
|
||||||
var success bool
|
var success bool
|
||||||
for attempts := 1; attempts <= retryAttempts; attempts++ {
|
for attempts := 1; attempts <= retryAttempts; attempts++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
done = true
|
ri.trackMultipleObjectVersions(exp, success)
|
||||||
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
stopFn := globalBatchJobsMetrics.trace(batchJobMetricExpire, ri.JobID, attempts)
|
stopFn := globalBatchJobsMetrics.trace(batchJobMetricExpire, ri.JobID, attempts)
|
||||||
@ -469,14 +469,7 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ri.trackMultipleObjectVersions(r.Bucket, exp, success)
|
ri.trackMultipleObjectVersions(exp, success)
|
||||||
if done {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if done {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteMultiple objects
|
// DeleteMultiple objects
|
||||||
|
@ -996,7 +996,7 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
|
|||||||
// Note: to be used only with batch jobs that affect multiple versions through
|
// Note: to be used only with batch jobs that affect multiple versions through
|
||||||
// a single action. e.g batch-expire has an option to expire all versions of an
|
// a single action. e.g batch-expire has an option to expire all versions of an
|
||||||
// object which matches the given filters.
|
// object which matches the given filters.
|
||||||
func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectInfo, success bool) {
|
func (ri *batchJobInfo) trackMultipleObjectVersions(info ObjectInfo, success bool) {
|
||||||
if success {
|
if success {
|
||||||
ri.Objects += int64(info.NumVersions)
|
ri.Objects += int64(info.NumVersions)
|
||||||
} else {
|
} else {
|
||||||
@ -1829,7 +1829,7 @@ func (a adminAPIHandlers) CancelBatchJob(w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, success := proxyRequestByToken(ctx, w, r, jobID); success {
|
if _, proxied, _ := proxyRequestByToken(ctx, w, r, jobID, true); proxied {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,26 +243,26 @@ func parseRequestToken(token string) (subToken string, nodeIndex int) {
|
|||||||
return subToken, nodeIndex
|
return subToken, nodeIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
func proxyRequestByToken(ctx context.Context, w http.ResponseWriter, r *http.Request, token string) (string, bool) {
|
func proxyRequestByToken(ctx context.Context, w http.ResponseWriter, r *http.Request, token string, returnErr bool) (subToken string, proxied bool, success bool) {
|
||||||
subToken, nodeIndex := parseRequestToken(token)
|
var nodeIndex int
|
||||||
if nodeIndex >= 0 {
|
if subToken, nodeIndex = parseRequestToken(token); nodeIndex >= 0 {
|
||||||
return subToken, proxyRequestByNodeIndex(ctx, w, r, nodeIndex)
|
proxied, success = proxyRequestByNodeIndex(ctx, w, r, nodeIndex, returnErr)
|
||||||
}
|
}
|
||||||
return subToken, false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http.Request, index int) (success bool) {
|
func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http.Request, index int, returnErr bool) (proxied, success bool) {
|
||||||
if len(globalProxyEndpoints) == 0 {
|
if len(globalProxyEndpoints) == 0 {
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
if index < 0 || index >= len(globalProxyEndpoints) {
|
if index < 0 || index >= len(globalProxyEndpoints) {
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
ep := globalProxyEndpoints[index]
|
ep := globalProxyEndpoints[index]
|
||||||
if ep.IsLocal {
|
if ep.IsLocal {
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
return proxyRequest(ctx, w, r, ep)
|
return true, proxyRequest(ctx, w, r, ep, returnErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectsV1Handler - GET Bucket (List Objects) Version 1.
|
// ListObjectsV1Handler - GET Bucket (List Objects) Version 1.
|
||||||
|
@ -1194,7 +1194,7 @@ func GetProxyEndpointLocalIndex(proxyEps []ProxyEndpoint) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
|
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
|
||||||
func GetProxyEndpoints(endpointServerPools EndpointServerPools) []ProxyEndpoint {
|
func GetProxyEndpoints(endpointServerPools EndpointServerPools, transport http.RoundTripper) []ProxyEndpoint {
|
||||||
var proxyEps []ProxyEndpoint
|
var proxyEps []ProxyEndpoint
|
||||||
|
|
||||||
proxyEpSet := set.NewStringSet()
|
proxyEpSet := set.NewStringSet()
|
||||||
@ -1213,7 +1213,7 @@ func GetProxyEndpoints(endpointServerPools EndpointServerPools) []ProxyEndpoint
|
|||||||
|
|
||||||
proxyEps = append(proxyEps, ProxyEndpoint{
|
proxyEps = append(proxyEps, ProxyEndpoint{
|
||||||
Endpoint: endpoint,
|
Endpoint: endpoint,
|
||||||
Transport: globalRemoteTargetTransport,
|
Transport: transport,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -447,7 +447,7 @@ func getHostName(r *http.Request) (hostName string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Proxy any request to an endpoint.
|
// Proxy any request to an endpoint.
|
||||||
func proxyRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, ep ProxyEndpoint) (success bool) {
|
func proxyRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, ep ProxyEndpoint, returnErr bool) (success bool) {
|
||||||
success = true
|
success = true
|
||||||
|
|
||||||
// Make sure we remove any existing headers before
|
// Make sure we remove any existing headers before
|
||||||
@ -462,7 +462,10 @@ func proxyRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, e
|
|||||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||||
success = false
|
success = false
|
||||||
if err != nil && !errors.Is(err, context.Canceled) {
|
if err != nil && !errors.Is(err, context.Canceled) {
|
||||||
replLogIf(GlobalContext, err)
|
proxyLogIf(GlobalContext, err)
|
||||||
|
}
|
||||||
|
if returnErr {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
@ -425,9 +425,10 @@ func serverHandleCmdArgs(ctxt serverCtxt) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// allow transport to be HTTP/1.1 for proxying.
|
// allow transport to be HTTP/1.1 for proxying.
|
||||||
globalProxyEndpoints = GetProxyEndpoints(globalEndpoints)
|
|
||||||
globalInternodeTransport = NewInternodeHTTPTransport(ctxt.MaxIdleConnsPerHost)()
|
globalInternodeTransport = NewInternodeHTTPTransport(ctxt.MaxIdleConnsPerHost)()
|
||||||
globalRemoteTargetTransport = NewRemoteTargetHTTPTransport(false)()
|
globalRemoteTargetTransport = NewRemoteTargetHTTPTransport(false)()
|
||||||
|
globalProxyEndpoints = GetProxyEndpoints(globalEndpoints, globalRemoteTargetTransport)
|
||||||
|
|
||||||
globalForwarder = handlers.NewForwarder(&handlers.Forwarder{
|
globalForwarder = handlers.NewForwarder(&handlers.Forwarder{
|
||||||
PassHost: true,
|
PassHost: true,
|
||||||
RoundTripper: globalRemoteTargetTransport,
|
RoundTripper: globalRemoteTargetTransport,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user