mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
allow requests to be proxied when server is booting up (#10790)
when server is booting up there is a possibility that users might see '503' because object layer when not initialized, then the request is proxied to neighboring peers first one which is online.
This commit is contained in:
parent
3a2f89b3c0
commit
02cfa774be
@ -17,7 +17,9 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -34,6 +36,7 @@ import (
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio/cmd/config"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/cmd/rest"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
@ -770,6 +773,72 @@ func GetProxyEndpointLocalIndex(proxyEps []ProxyEndpoint) int {
|
||||
return -1
|
||||
}
|
||||
|
||||
func httpDo(clnt *http.Client, req *http.Request, f func(*http.Response, error) error) error {
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, 200*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
// Run the HTTP request in a goroutine and pass the response to f.
|
||||
c := make(chan error, 1)
|
||||
req = req.WithContext(ctx)
|
||||
go func() { c <- f(clnt.Do(req)) }()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
<-c // Wait for f to return.
|
||||
return ctx.Err()
|
||||
case err := <-c:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func getOnlineProxyEndpointIdx() int {
|
||||
type reqIndex struct {
|
||||
Request *http.Request
|
||||
Idx int
|
||||
}
|
||||
|
||||
proxyRequests := make(map[*http.Client]reqIndex, len(globalProxyEndpoints))
|
||||
for i, proxyEp := range globalProxyEndpoints {
|
||||
proxyEp := proxyEp
|
||||
serverURL := &url.URL{
|
||||
Scheme: proxyEp.Scheme,
|
||||
Host: proxyEp.Host,
|
||||
Path: pathJoin(healthCheckPathPrefix, healthCheckLivenessPath),
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, serverURL.String(), nil)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
proxyRequests[&http.Client{
|
||||
Transport: proxyEp.Transport,
|
||||
}] = reqIndex{
|
||||
Request: req,
|
||||
Idx: i,
|
||||
}
|
||||
}
|
||||
|
||||
for c, r := range proxyRequests {
|
||||
if err := httpDo(c, r.Request, func(resp *http.Response, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
xhttp.DrainBody(resp.Body)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return errors.New(resp.Status)
|
||||
}
|
||||
if v := resp.Header.Get(xhttp.MinIOServerStatus); v == unavailable {
|
||||
return errors.New(v)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
continue
|
||||
}
|
||||
return r.Idx
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
|
||||
func GetProxyEndpoints(endpointServerSets EndpointServerSets) ([]ProxyEndpoint, error) {
|
||||
var proxyEps []ProxyEndpoint
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
@ -157,15 +158,40 @@ const (
|
||||
loginPathPrefix = SlashSeparator + "login"
|
||||
)
|
||||
|
||||
// Adds redirect rules for incoming requests.
|
||||
type redirectHandler struct {
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
func setBrowserRedirectHandler(h http.Handler) http.Handler {
|
||||
func setRedirectHandler(h http.Handler) http.Handler {
|
||||
return redirectHandler{handler: h}
|
||||
}
|
||||
|
||||
// Adds redirect rules for incoming requests.
|
||||
type browserRedirectHandler struct {
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
func setBrowserRedirectHandler(h http.Handler) http.Handler {
|
||||
return browserRedirectHandler{handler: h}
|
||||
}
|
||||
|
||||
func (h redirectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case guessIsRPCReq(r), guessIsBrowserReq(r), guessIsHealthCheckReq(r), guessIsMetricsReq(r), isAdminReq(r):
|
||||
h.handler.ServeHTTP(w, r)
|
||||
return
|
||||
case newObjectLayerFn() == nil:
|
||||
// if this server is still initializing, proxy the request
|
||||
// to any other online servers to avoid 503 for any incoming
|
||||
// API calls.
|
||||
if idx := getOnlineProxyEndpointIdx(); idx >= 0 {
|
||||
proxyRequest(context.TODO(), w, r, globalProxyEndpoints[idx])
|
||||
return
|
||||
}
|
||||
}
|
||||
h.handler.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// Fetch redirect location if urlPath satisfies certain
|
||||
// criteria. Some special names are considered to be
|
||||
// redirectable, this is purely internal function and
|
||||
@ -236,7 +262,7 @@ func guessIsRPCReq(req *http.Request) bool {
|
||||
strings.HasPrefix(req.URL.Path, minioReservedBucketPath+SlashSeparator)
|
||||
}
|
||||
|
||||
func (h redirectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
func (h browserRedirectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Re-direction is handled specifically for browser requests.
|
||||
if guessIsBrowserReq(r) {
|
||||
// Fetch the redirect location if any.
|
||||
|
@ -20,8 +20,12 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
)
|
||||
|
||||
const unavailable = "offline"
|
||||
|
||||
// ClusterCheckHandler returns if the server is ready for requests.
|
||||
func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ClusterCheckHandler")
|
||||
@ -29,6 +33,7 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
objLayer := newObjectLayerFn()
|
||||
// Service not initialized yet
|
||||
if objLayer == nil {
|
||||
w.Header().Set(xhttp.MinIOServerStatus, unavailable)
|
||||
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
|
||||
return
|
||||
}
|
||||
@ -39,12 +44,12 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
opts := HealthOptions{Maintenance: r.URL.Query().Get("maintenance") == "true"}
|
||||
result := objLayer.Health(ctx, opts)
|
||||
if result.WriteQuorum > 0 {
|
||||
w.Header().Set("X-Minio-Write-Quorum", strconv.Itoa(result.WriteQuorum))
|
||||
w.Header().Set(xhttp.MinIOWriteQuorum, strconv.Itoa(result.WriteQuorum))
|
||||
}
|
||||
if !result.Healthy {
|
||||
// return how many drives are being healed if any
|
||||
if result.HealingDrives > 0 {
|
||||
w.Header().Set("X-Minio-Healing-Drives", strconv.Itoa(result.HealingDrives))
|
||||
w.Header().Set(xhttp.MinIOHealingDrives, strconv.Itoa(result.HealingDrives))
|
||||
}
|
||||
// As a maintenance call we are purposefully asked to be taken
|
||||
// down, this is for orchestrators to know if we can safely
|
||||
@ -61,12 +66,19 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// ReadinessCheckHandler Checks if the process is up. Always returns success.
|
||||
func ReadinessCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// TODO: only implement this function to notify that this pod is
|
||||
// busy, at a local scope in future, for now '200 OK'.
|
||||
if newObjectLayerFn() == nil {
|
||||
// Service not initialized yet
|
||||
w.Header().Set(xhttp.MinIOServerStatus, unavailable)
|
||||
}
|
||||
|
||||
writeResponse(w, http.StatusOK, nil, mimeNone)
|
||||
}
|
||||
|
||||
// LivenessCheckHandler - Checks if the process is up. Always returns success.
|
||||
func LivenessCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if newObjectLayerFn() == nil {
|
||||
// Service not initialized yet
|
||||
w.Header().Set(xhttp.MinIOServerStatus, unavailable)
|
||||
}
|
||||
writeResponse(w, http.StatusOK, nil, mimeNone)
|
||||
}
|
||||
|
@ -126,6 +126,12 @@ const (
|
||||
|
||||
// Header indicates if the etag should be preserved by client
|
||||
MinIOSourceETag = "x-minio-source-etag"
|
||||
|
||||
// Writes expected write quorum
|
||||
MinIOWriteQuorum = "x-minio-write-quorum"
|
||||
|
||||
// Reports number of drives currently healing
|
||||
MinIOHealingDrives = "x-minio-healing-drives"
|
||||
)
|
||||
|
||||
// Common http query params S3 API
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -177,7 +176,7 @@ func IsServerResolvable(endpoint Endpoint) error {
|
||||
serverURL := &url.URL{
|
||||
Scheme: endpoint.Scheme,
|
||||
Host: endpoint.Host,
|
||||
Path: path.Join(healthCheckPathPrefix, healthCheckLivenessPath),
|
||||
Path: pathJoin(healthCheckPathPrefix, healthCheckLivenessPath),
|
||||
}
|
||||
|
||||
var tlsConfig *tls.Config
|
||||
@ -195,9 +194,9 @@ func IsServerResolvable(endpoint Endpoint) error {
|
||||
&http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: xhttp.NewCustomDialContext(3 * time.Second),
|
||||
ResponseHeaderTimeout: 5 * time.Second,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
ExpectContinueTimeout: 5 * time.Second,
|
||||
ResponseHeaderTimeout: 3 * time.Second,
|
||||
TLSHandshakeTimeout: 3 * time.Second,
|
||||
ExpectContinueTimeout: 3 * time.Second,
|
||||
TLSClientConfig: tlsConfig,
|
||||
// Go net/http automatically unzip if content-type is
|
||||
// gzip disable this feature, as we are always interested
|
||||
@ -207,23 +206,29 @@ func IsServerResolvable(endpoint Endpoint) error {
|
||||
}
|
||||
defer httpClient.CloseIdleConnections()
|
||||
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
|
||||
defer cancel()
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, 3*time.Second)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, serverURL.String(), nil)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := httpClient.Do(req)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer xhttp.DrainBody(resp.Body)
|
||||
xhttp.DrainBody(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return StorageErr(resp.Status)
|
||||
}
|
||||
|
||||
if resp.Header.Get(xhttp.MinIOServerStatus) == unavailable {
|
||||
return StorageErr(unavailable)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,10 @@ func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointS
|
||||
|
||||
// List of some generic handlers which are applied for all incoming requests.
|
||||
var globalHandlers = []MiddlewareFunc{
|
||||
// add redirect handler to redirect
|
||||
// requests when object layer is not
|
||||
// initialized.
|
||||
setRedirectHandler,
|
||||
// set x-amz-request-id header.
|
||||
addCustomHeaders,
|
||||
// set HTTP security headers such as Content-Security-Policy.
|
||||
|
Loading…
Reference in New Issue
Block a user