diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 91c684aac..559d1990a 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -652,6 +652,20 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { return } + if globalIsDistErasure { + // Analyze the heal token and route the request accordingly + _, nodeIndex, parsed := parseRequestToken(hip.clientToken) + if parsed { + if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) { + return + } + } else { + apiErr := errorCodes.ToAPIErr(ErrHealInvalidClientToken) + writeErrorResponseJSON(ctx, w, apiErr, r.URL) + return + } + } + type healResp struct { respBytes []byte apiErr APIError diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 1ba70bf46..9b234967e 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -370,13 +370,18 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string, reqInfo.AppendTags("prefix", objPrefix) ctx, cancel := context.WithCancel(logger.SetReqInfo(ctx, reqInfo)) + clientToken := mustGetUUID() + if globalIsDistErasure { + clientToken = fmt.Sprintf("%s@%d", clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) + } + return &healSequence{ respCh: make(chan healResult), bucket: bucket, object: objPrefix, reportProgress: true, startTime: UTCNow(), - clientToken: mustGetUUID(), + clientToken: clientToken, clientAddress: clientAddr, forceStarted: forceStart, settings: hs, diff --git a/cmd/bucket-listobjects-handlers.go b/cmd/bucket-listobjects-handlers.go index 697e65c98..22c6e56d7 100644 --- a/cmd/bucket-listobjects-handlers.go +++ b/cmd/bucket-listobjects-handlers.go @@ -18,8 +18,9 @@ package cmd import ( "context" - "io" + "fmt" "net/http" + "strconv" "strings" "github.com/gorilla/mux" @@ -159,8 +160,13 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt return } - if proxyListRequest(ctx, w, r, bucket) { - return + // Analyze continuation token and route the request accordingly + subToken, nodeIndex, parsed := parseRequestToken(token) + if parsed { + if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) { + return + } + token = subToken } listObjectsV2 := objectAPI.ListObjectsV2 @@ -185,8 +191,10 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt } } - response := generateListObjectsV2Response(bucket, prefix, token, - listObjectsV2Info.NextContinuationToken, startAfter, + // The next continuation token has id@node_index format to optimize paginated listing + nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex()) + + response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter, delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated, maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, true) @@ -237,8 +245,13 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http return } - if proxyListRequest(ctx, w, r, bucket) { - return + // Analyze continuation token and route the request accordingly + subToken, nodeIndex, parsed := parseRequestToken(token) + if parsed { + if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) { + return + } + token = subToken } listObjectsV2 := objectAPI.ListObjectsV2 @@ -263,8 +276,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http } } - response := generateListObjectsV2Response(bucket, prefix, token, - listObjectsV2Info.NextContinuationToken, startAfter, + // The next continuation token has id@node_index format to optimize paginated listing + nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex()) + + response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter, delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated, maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, false) @@ -272,41 +287,47 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http writeSuccessResponseXML(w, encodeResponse(response)) } -func getListEndpoint(bucket string) ListEndpoint { - return globalListEndpoints[crcHashMod(bucket, len(globalListEndpoints))] -} - -// Proxy the list request to the right server. -func proxyListRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, bucket string) (success bool) { - if len(globalListEndpoints) == 0 { - return false +func getLocalNodeIndex() int { + if len(globalProxyEndpoints) == 0 { + return -1 } - ep := getListEndpoint(bucket) - if ep.isLocal { - return false - } - ctx = r.Context() - outreq := r.Clone(ctx) - outreq.URL.Scheme = "http" - outreq.URL.Host = ep.host - outreq.URL.Path = r.URL.Path - outreq.Header.Add("Host", r.Host) - if globalIsSSL { - outreq.URL.Scheme = "https" - } - outreq.Host = r.Host - res, err := ep.t.RoundTrip(outreq) - if err != nil { - return false - } - for k, vv := range res.Header { - for _, v := range vv { - w.Header().Set(k, v) + for i, ep := range globalProxyEndpoints { + if ep.IsLocal { + return i } } - w.WriteHeader(res.StatusCode) - io.Copy(w, res.Body) - return true + return -1 +} + +func parseRequestToken(token string) (subToken string, nodeIndex int, success bool) { + i := strings.Index(token, "@") + if i < 0 { + return "", -1, false + } + nodeIndex, err := strconv.Atoi(token[i+1:]) + if err != nil { + return "", -1, false + } + subToken = token[:i] + return subToken, nodeIndex, true +} + +func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http.Request, index int) (success bool) { + if len(globalProxyEndpoints) == 0 { + return false + } + if index < 0 || index >= len(globalProxyEndpoints) { + return false + } + ep := globalProxyEndpoints[index] + if ep.IsLocal { + return false + } + return proxyRequest(ctx, w, r, ep) +} + +func proxyRequestByBucket(ctx context.Context, w http.ResponseWriter, r *http.Request, bucket string) (success bool) { + return proxyRequestByNodeIndex(ctx, w, r, crcHashMod(bucket, len(globalProxyEndpoints))) } // ListObjectsV1Handler - GET Bucket (List Objects) Version 1. @@ -347,7 +368,7 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http return } - if proxyListRequest(ctx, w, r, bucket) { + if proxyRequestByBucket(ctx, w, r, bucket) { return } diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 79d2adc20..b3e6c0df8 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -49,12 +49,11 @@ const ( URLEndpointType ) -// ListEndpoint - endpoint used for list redirects -// See proxyListRequest() for details. -type ListEndpoint struct { - host string - t *http.Transport - isLocal bool +// ProxyEndpoint - endpoint used for proxy redirects +// See proxyRequest() for details. +type ProxyEndpoint struct { + Endpoint + Transport *http.Transport } // Endpoint - any type of endpoint. @@ -719,18 +718,21 @@ func GetRemotePeers(endpointZones EndpointZones) []string { return peerSet.ToSlice() } -// GetListEndpoints - get all endpoints that can be used to proxy list request. -func GetListEndpoints(endpointZones EndpointZones) ([]ListEndpoint, error) { - var listeps []ListEndpoint - - listepExists := func(host string) bool { - for _, listep := range listeps { - if listep.host == host { - return true - } +// GetProxyEndpointLocalIndex returns index of the local proxy endpoint +func GetProxyEndpointLocalIndex(proxyEps []ProxyEndpoint) int { + for i, pep := range proxyEps { + if pep.IsLocal { + return i } - return false } + return -1 +} + +// GetProxyEndpoints - get all endpoints that can be used to proxy list request. +func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) { + var proxyEps []ProxyEndpoint + + proxyEpSet := set.NewStringSet() for _, ep := range endpointZones { for _, endpoint := range ep.Endpoints { @@ -739,28 +741,25 @@ func GetListEndpoints(endpointZones EndpointZones) ([]ListEndpoint, error) { } host := endpoint.Host - if listepExists(host) { + if proxyEpSet.Contains(host) { continue } - hostName, _, err := net.SplitHostPort(host) - if err != nil { - return nil, err - } + proxyEpSet.Add(host) + var tlsConfig *tls.Config if globalIsSSL { tlsConfig = &tls.Config{ - ServerName: hostName, + ServerName: endpoint.Hostname(), RootCAs: globalRootCAs, } } - listeps = append(listeps, ListEndpoint{ - host, - newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(), - endpoint.IsLocal, + proxyEps = append(proxyEps, ProxyEndpoint{ + Endpoint: endpoint, + Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(), }) } } - return listeps, nil + return proxyEps, nil } func updateDomainIPs(endPoints set.StringSet) { diff --git a/cmd/globals.go b/cmd/globals.go index 87e7a3568..8baf5453a 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -279,7 +279,7 @@ var ( // If writes to FS backend should be O_SYNC. globalFSOSync bool - globalListEndpoints []ListEndpoint + globalProxyEndpoints []ProxyEndpoint // Add new variable global values here. ) diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index 52d9a1c19..c2a7643fd 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2015, 2016, 2017 MinIO, Inc. + * MinIO Cloud Storage, (C) 2015-2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -452,3 +452,29 @@ func getHostName(r *http.Request) (hostName string) { } return } + +// Proxy any request to an endpoint. +func proxyRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, ep ProxyEndpoint) (success bool) { + success = true + + f := handlers.NewForwarder(&handlers.Forwarder{ + PassHost: true, + RoundTripper: ep.Transport, + ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { + success = false + w.WriteHeader(http.StatusBadGateway) + }, + Logger: func(err error) { + logger.LogIf(GlobalContext, err) + }, + }) + + r.URL.Scheme = "http" + if globalIsSSL { + r.URL.Scheme = "https" + } + + r.URL.Host = ep.Host + f.ServeHTTP(w, r) + return +} diff --git a/cmd/server-main.go b/cmd/server-main.go index 6eafca93a..0ef7d8a75 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -396,7 +396,7 @@ func serverMain(ctx *cli.Context) { globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get()) logger.FatalIf(err, "Failed to read root CAs (%v)", err) - globalListEndpoints, err = GetListEndpoints(globalEndpoints) + globalProxyEndpoints, err = GetProxyEndpoints(globalEndpoints) logger.FatalIf(err, "Invalid command line arguments") globalMinioEndpoint = func() string { diff --git a/pkg/handlers/forwarder.go b/pkg/handlers/forwarder.go index 5c25f4652..b299115ed 100644 --- a/pkg/handlers/forwarder.go +++ b/pkg/handlers/forwarder.go @@ -33,6 +33,7 @@ type Forwarder struct { RoundTripper http.RoundTripper PassHost bool Logger func(error) + ErrorHandler func(http.ResponseWriter, *http.Request, error) // internal variables rewriter *headerRewriter @@ -61,6 +62,11 @@ func (f *Forwarder) ServeHTTP(w http.ResponseWriter, inReq *http.Request) { FlushInterval: defaultFlushInterval, ErrorHandler: f.customErrHandler, } + + if f.ErrorHandler != nil { + revproxy.ErrorHandler = f.ErrorHandler + } + revproxy.ServeHTTP(w, outReq) }