2021-04-18 12:41:13 -07:00
|
|
|
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
|
|
//
|
|
|
|
// This file is part of MinIO Object Storage stack
|
|
|
|
//
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This program is distributed in the hope that it will be useful
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Affero General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
2018-10-04 17:44:06 -07:00
|
|
|
|
|
|
|
package rest
|
|
|
|
|
|
|
|
import (
|
2022-07-27 09:44:59 -07:00
|
|
|
"bytes"
|
2018-10-04 17:44:06 -07:00
|
|
|
"context"
|
|
|
|
"errors"
|
2020-11-23 09:12:17 -08:00
|
|
|
"fmt"
|
2018-10-04 17:44:06 -07:00
|
|
|
"io"
|
2020-10-13 18:28:42 -07:00
|
|
|
"math/rand"
|
2018-10-04 17:44:06 -07:00
|
|
|
"net/http"
|
2023-06-08 08:39:47 -07:00
|
|
|
"net/http/httputil"
|
2018-10-04 17:44:06 -07:00
|
|
|
"net/url"
|
2022-07-27 09:44:59 -07:00
|
|
|
"path"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
2020-06-16 18:59:32 -07:00
|
|
|
"sync/atomic"
|
2018-10-04 17:44:06 -07:00
|
|
|
"time"
|
|
|
|
|
2021-06-01 14:59:40 -07:00
|
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
|
|
"github.com/minio/minio/internal/logger"
|
2022-12-06 18:27:26 +01:00
|
|
|
"github.com/minio/minio/internal/mcontext"
|
2023-09-04 12:57:37 -07:00
|
|
|
xnet "github.com/minio/pkg/v2/net"
|
2018-10-04 17:44:06 -07:00
|
|
|
)
|
|
|
|
|
2020-09-29 15:18:34 -07:00
|
|
|
// DefaultTimeout - default REST timeout is 10 seconds.
|
|
|
|
const DefaultTimeout = 10 * time.Second
|
2018-10-04 17:44:06 -07:00
|
|
|
|
2020-06-16 18:59:32 -07:00
|
|
|
const (
|
|
|
|
offline = iota
|
|
|
|
online
|
|
|
|
closed
|
|
|
|
)
|
|
|
|
|
2019-05-29 10:21:47 -07:00
|
|
|
// NetworkError - error type in case of errors related to http/transport
|
|
|
|
// for ex. connection refused, connection reset, dns resolution failure etc.
|
|
|
|
// All errors returned by storage-rest-server (ex errFileNotFound, errDiskNotFound) are not considered to be network errors.
|
|
|
|
type NetworkError struct {
|
|
|
|
Err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *NetworkError) Error() string {
|
|
|
|
return n.Err.Error()
|
|
|
|
}
|
|
|
|
|
2020-06-16 18:59:32 -07:00
|
|
|
// Unwrap returns the error wrapped in NetworkError.
|
|
|
|
func (n *NetworkError) Unwrap() error {
|
|
|
|
return n.Err
|
|
|
|
}
|
|
|
|
|
2018-10-04 17:44:06 -07:00
|
|
|
// Client - http based RPC client.
|
|
|
|
type Client struct {
|
2021-02-08 08:51:12 -08:00
|
|
|
connected int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
2021-05-11 17:19:15 +01:00
|
|
|
_ int32 // For 64 bits alignment
|
|
|
|
lastConn int64
|
2021-02-08 08:51:12 -08:00
|
|
|
|
2020-06-17 14:49:26 -07:00
|
|
|
// HealthCheckFn is the function set to test for health.
|
|
|
|
// If not set the client will not keep track of health.
|
|
|
|
// Calling this returns true or false if the target
|
|
|
|
// is online or offline.
|
|
|
|
HealthCheckFn func() bool
|
2020-06-16 18:59:32 -07:00
|
|
|
|
2023-05-02 20:35:52 +01:00
|
|
|
// HealthCheckRetryUnit will be used to calculate the exponential
|
|
|
|
// backoff when trying to reconnect to an offline node
|
|
|
|
HealthCheckReconnectUnit time.Duration
|
2020-06-16 18:59:32 -07:00
|
|
|
|
|
|
|
// HealthCheckTimeout determines timeout for each call.
|
|
|
|
HealthCheckTimeout time.Duration
|
|
|
|
|
|
|
|
// MaxErrResponseSize is the maximum expected response size.
|
|
|
|
// Should only be modified before any calls are made.
|
|
|
|
MaxErrResponseSize int64
|
|
|
|
|
2021-06-08 22:09:26 +01:00
|
|
|
// Avoid metrics update if set to true
|
|
|
|
NoMetrics bool
|
|
|
|
|
2023-06-08 08:39:47 -07:00
|
|
|
// TraceOutput will print debug information on non-200 calls if set.
|
|
|
|
TraceOutput io.Writer // Debug trace output
|
|
|
|
|
2020-10-12 14:19:46 -07:00
|
|
|
httpClient *http.Client
|
|
|
|
url *url.URL
|
|
|
|
newAuthToken func(audience string) string
|
2018-10-04 17:44:06 -07:00
|
|
|
|
2022-07-27 09:44:59 -07:00
|
|
|
sync.RWMutex // mutex for lastErr
|
|
|
|
lastErr error
|
2023-05-31 01:07:26 +08:00
|
|
|
lastErrTime time.Time
|
2022-07-27 09:44:59 -07:00
|
|
|
}
|
2019-08-06 12:08:58 -07:00
|
|
|
|
2020-07-29 23:15:34 -07:00
|
|
|
type restError string
|
|
|
|
|
|
|
|
func (e restError) Error() string {
|
|
|
|
return string(e)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e restError) Timeout() bool {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2022-07-27 09:44:59 -07:00
|
|
|
// Given a string of the form "host", "host:port", or "[ipv6::address]:port",
|
|
|
|
// return true if the string includes a port.
|
|
|
|
func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
|
|
|
|
|
|
|
|
// removeEmptyPort strips the empty port in ":port" to ""
|
|
|
|
// as mandated by RFC 3986 Section 6.2.3.
|
|
|
|
func removeEmptyPort(host string) string {
|
|
|
|
if hasPort(host) {
|
|
|
|
return strings.TrimSuffix(host, ":")
|
2020-06-16 18:59:32 -07:00
|
|
|
}
|
2022-07-27 09:44:59 -07:00
|
|
|
return host
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copied from http.NewRequest but implemented to ensure we re-use `url.URL` instance.
|
2023-07-07 22:07:30 -07:00
|
|
|
func (c *Client) newRequest(ctx context.Context, u url.URL, body io.Reader) (*http.Request, error) {
|
2022-07-27 09:44:59 -07:00
|
|
|
rc, ok := body.(io.ReadCloser)
|
|
|
|
if !ok && body != nil {
|
|
|
|
rc = io.NopCloser(body)
|
|
|
|
}
|
|
|
|
req := &http.Request{
|
|
|
|
Method: http.MethodPost,
|
2023-07-07 22:07:30 -07:00
|
|
|
URL: &u,
|
2022-07-27 09:44:59 -07:00
|
|
|
Proto: "HTTP/1.1",
|
|
|
|
ProtoMajor: 1,
|
|
|
|
ProtoMinor: 1,
|
|
|
|
Header: make(http.Header),
|
|
|
|
Body: rc,
|
|
|
|
Host: u.Host,
|
|
|
|
}
|
|
|
|
req = req.WithContext(ctx)
|
|
|
|
if body != nil {
|
|
|
|
switch v := body.(type) {
|
|
|
|
case *bytes.Buffer:
|
|
|
|
req.ContentLength = int64(v.Len())
|
|
|
|
buf := v.Bytes()
|
|
|
|
req.GetBody = func() (io.ReadCloser, error) {
|
|
|
|
r := bytes.NewReader(buf)
|
|
|
|
return io.NopCloser(r), nil
|
|
|
|
}
|
|
|
|
case *bytes.Reader:
|
|
|
|
req.ContentLength = int64(v.Len())
|
|
|
|
snapshot := *v
|
|
|
|
req.GetBody = func() (io.ReadCloser, error) {
|
|
|
|
r := snapshot
|
|
|
|
return io.NopCloser(&r), nil
|
|
|
|
}
|
|
|
|
case *strings.Reader:
|
|
|
|
req.ContentLength = int64(v.Len())
|
|
|
|
snapshot := *v
|
|
|
|
req.GetBody = func() (io.ReadCloser, error) {
|
|
|
|
r := snapshot
|
|
|
|
return io.NopCloser(&r), nil
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
// This is where we'd set it to -1 (at least
|
|
|
|
// if body != NoBody) to mean unknown, but
|
|
|
|
// that broke people during the Go 1.8 testing
|
|
|
|
// period. People depend on it being 0 I
|
|
|
|
// guess. Maybe retry later. See Issue 18117.
|
|
|
|
}
|
|
|
|
// For client requests, Request.ContentLength of 0
|
|
|
|
// means either actually 0, or unknown. The only way
|
|
|
|
// to explicitly say that the ContentLength is zero is
|
|
|
|
// to set the Body to nil. But turns out too much code
|
|
|
|
// depends on NewRequest returning a non-nil Body,
|
|
|
|
// so we use a well-known ReadCloser variable instead
|
|
|
|
// and have the http package also treat that sentinel
|
|
|
|
// variable to mean explicitly zero.
|
|
|
|
if req.GetBody != nil && req.ContentLength == 0 {
|
|
|
|
req.Body = http.NoBody
|
|
|
|
req.GetBody = func() (io.ReadCloser, error) { return http.NoBody, nil }
|
|
|
|
}
|
2018-10-04 17:44:06 -07:00
|
|
|
}
|
2022-07-27 09:44:59 -07:00
|
|
|
|
2022-01-18 12:44:38 -08:00
|
|
|
if c.newAuthToken != nil {
|
2022-07-27 09:44:59 -07:00
|
|
|
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(u.RawQuery))
|
2022-01-18 12:44:38 -08:00
|
|
|
}
|
2018-10-04 17:44:06 -07:00
|
|
|
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
|
2022-07-27 09:44:59 -07:00
|
|
|
|
2022-12-06 18:27:26 +01:00
|
|
|
if tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt); ok {
|
|
|
|
req.Header.Set(xhttp.AmzRequestID, tc.AmzReqID)
|
|
|
|
}
|
|
|
|
|
2022-07-27 09:44:59 -07:00
|
|
|
return req, nil
|
|
|
|
}
|
|
|
|
|
2022-09-12 20:40:51 +01:00
|
|
|
type respBodyMonitor struct {
|
|
|
|
io.ReadCloser
|
2023-05-22 22:42:27 +08:00
|
|
|
expectTimeouts bool
|
|
|
|
errorStatusOnce sync.Once
|
2022-09-12 20:40:51 +01:00
|
|
|
}
|
|
|
|
|
2023-05-22 22:42:27 +08:00
|
|
|
func (r *respBodyMonitor) Read(p []byte) (n int, err error) {
|
2022-09-12 20:40:51 +01:00
|
|
|
n, err = r.ReadCloser.Read(p)
|
2023-05-22 22:42:27 +08:00
|
|
|
r.errorStatus(err)
|
2022-09-12 20:40:51 +01:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-05-22 22:42:27 +08:00
|
|
|
func (r *respBodyMonitor) Close() (err error) {
|
2022-09-12 20:40:51 +01:00
|
|
|
err = r.ReadCloser.Close()
|
2023-05-22 22:42:27 +08:00
|
|
|
r.errorStatus(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *respBodyMonitor) errorStatus(err error) {
|
2022-11-07 19:38:08 +01:00
|
|
|
if xnet.IsNetworkOrHostDown(err, r.expectTimeouts) {
|
2023-05-22 22:42:27 +08:00
|
|
|
r.errorStatusOnce.Do(func() {
|
|
|
|
atomic.AddUint64(&globalStats.errs, 1)
|
|
|
|
})
|
2022-09-12 20:40:51 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-08 08:39:47 -07:00
|
|
|
// dumpHTTP - dump HTTP request and response.
|
|
|
|
func (c *Client) dumpHTTP(req *http.Request, resp *http.Response) {
|
|
|
|
// Starts http dump.
|
|
|
|
_, err := fmt.Fprintln(c.TraceOutput, "---------START-HTTP---------")
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Filter out Signature field from Authorization header.
|
|
|
|
origAuth := req.Header.Get("Authorization")
|
|
|
|
if origAuth != "" {
|
|
|
|
req.Header.Set("Authorization", "**REDACTED**")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Only display request header.
|
|
|
|
reqTrace, err := httputil.DumpRequestOut(req, false)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write request to trace output.
|
|
|
|
_, err = fmt.Fprint(c.TraceOutput, string(reqTrace))
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Only display response header.
|
|
|
|
var respTrace []byte
|
|
|
|
|
|
|
|
// For errors we make sure to dump response body as well.
|
|
|
|
if resp.StatusCode != http.StatusOK &&
|
|
|
|
resp.StatusCode != http.StatusPartialContent &&
|
|
|
|
resp.StatusCode != http.StatusNoContent {
|
|
|
|
respTrace, err = httputil.DumpResponse(resp, true)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
respTrace, err = httputil.DumpResponse(resp, false)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write response to trace output.
|
|
|
|
_, err = fmt.Fprint(c.TraceOutput, strings.TrimSuffix(string(respTrace), "\r\n"))
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ends the http dump.
|
|
|
|
_, err = fmt.Fprintln(c.TraceOutput, "---------END-HTTP---------")
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns success.
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-07-27 09:44:59 -07:00
|
|
|
// Call - make a REST call with context.
|
|
|
|
func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) {
|
|
|
|
if !c.IsOnline() {
|
2023-07-07 22:07:30 -07:00
|
|
|
return nil, &NetworkError{Err: c.LastError()}
|
2022-07-27 09:44:59 -07:00
|
|
|
}
|
|
|
|
|
2023-07-07 22:07:30 -07:00
|
|
|
// Shallow copy. We don't modify the *UserInfo, if set.
|
|
|
|
// All other fields are copied.
|
|
|
|
u := *c.url
|
2022-07-27 09:44:59 -07:00
|
|
|
u.Path = path.Join(u.Path, method)
|
|
|
|
u.RawQuery = values.Encode()
|
|
|
|
|
|
|
|
req, err := c.newRequest(ctx, u, body)
|
|
|
|
if err != nil {
|
2023-07-07 22:07:30 -07:00
|
|
|
return nil, &NetworkError{Err: err}
|
2022-07-27 09:44:59 -07:00
|
|
|
}
|
2019-01-17 04:58:18 -08:00
|
|
|
if length > 0 {
|
|
|
|
req.ContentLength = length
|
|
|
|
}
|
2022-09-26 17:04:26 +01:00
|
|
|
|
allow quota enforcement to rely on older values (#17351)
PUT calls cannot afford to have large latency build-ups due
to contentious usage.json, or worse letting them fail with
some unexpected error, this can happen when this file is
concurrently being updated via scanner or it is being
healed during a disk replacement heal.
However, these are fairly quick in theory, stressed clusters
can quickly show visible latency this can add up leading to
invalid errors returned during PUT.
It is perhaps okay for us to relax this error return requirement
instead, make sure that we log that we are proceeding to take in
the requests while the quota is using an older value for the quota
enforcement. These things will reconcile themselves eventually,
via scanner making sure to overwrite the usage.json.
Bonus: make sure that storage-rest-client sets ExpectTimeouts to
be 'true', such that DiskInfo() call with contextTimeout does
not prematurely disconnect the servers leading to a longer
healthCheck, back-off routine. This can easily pile up while also
causing active callers to disconnect, leading to quorum loss.
DiskInfo is actively used in the PUT, Multipart call path for
upgrading parity when disks are down, it in-turn shouldn't cause
more disks to go down.
2023-06-05 16:56:35 -07:00
|
|
|
_, expectTimeouts := ctx.Deadline()
|
|
|
|
|
2022-09-26 17:04:26 +01:00
|
|
|
req, update := setupReqStatsUpdate(req)
|
|
|
|
defer update()
|
|
|
|
|
2018-10-04 17:44:06 -07:00
|
|
|
resp, err := c.httpClient.Do(req)
|
|
|
|
if err != nil {
|
allow quota enforcement to rely on older values (#17351)
PUT calls cannot afford to have large latency build-ups due
to contentious usage.json, or worse letting them fail with
some unexpected error, this can happen when this file is
concurrently being updated via scanner or it is being
healed during a disk replacement heal.
However, these are fairly quick in theory, stressed clusters
can quickly show visible latency this can add up leading to
invalid errors returned during PUT.
It is perhaps okay for us to relax this error return requirement
instead, make sure that we log that we are proceeding to take in
the requests while the quota is using an older value for the quota
enforcement. These things will reconcile themselves eventually,
via scanner making sure to overwrite the usage.json.
Bonus: make sure that storage-rest-client sets ExpectTimeouts to
be 'true', such that DiskInfo() call with contextTimeout does
not prematurely disconnect the servers leading to a longer
healthCheck, back-off routine. This can easily pile up while also
causing active callers to disconnect, leading to quorum loss.
DiskInfo is actively used in the PUT, Multipart call path for
upgrading parity when disks are down, it in-turn shouldn't cause
more disks to go down.
2023-06-05 16:56:35 -07:00
|
|
|
if xnet.IsNetworkOrHostDown(err, expectTimeouts) {
|
2021-06-08 22:09:26 +01:00
|
|
|
if !c.NoMetrics {
|
2022-09-26 17:04:26 +01:00
|
|
|
atomic.AddUint64(&globalStats.errs, 1)
|
2021-06-08 22:09:26 +01:00
|
|
|
}
|
2022-07-27 09:44:59 -07:00
|
|
|
if c.MarkOffline(err) {
|
|
|
|
logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host)
|
2020-12-30 14:38:54 -08:00
|
|
|
}
|
2020-06-16 18:59:32 -07:00
|
|
|
}
|
2019-05-29 10:21:47 -07:00
|
|
|
return nil, &NetworkError{err}
|
2018-10-04 17:44:06 -07:00
|
|
|
}
|
|
|
|
|
2023-06-08 08:39:47 -07:00
|
|
|
// If trace is enabled, dump http request and response,
|
|
|
|
// except when the traceErrorsOnly enabled and the response's status code is ok
|
|
|
|
if c.TraceOutput != nil && resp.StatusCode != http.StatusOK {
|
|
|
|
c.dumpHTTP(req, resp)
|
|
|
|
}
|
|
|
|
|
2018-10-04 17:44:06 -07:00
|
|
|
if resp.StatusCode != http.StatusOK {
|
2020-06-17 14:49:26 -07:00
|
|
|
// If server returns 412 pre-condition failed, it would
|
|
|
|
// mean that authentication succeeded, but another
|
|
|
|
// side-channel check has failed, we shall take
|
|
|
|
// the client offline in such situations.
|
|
|
|
// generally all implementations should simply return
|
|
|
|
// 403, but in situations where there is a dependency
|
|
|
|
// with the caller to take the client offline purpose
|
|
|
|
// fully it should make sure to respond with '412'
|
|
|
|
// instead, see cmd/storage-rest-server.go for ideas.
|
2020-11-10 09:28:23 -08:00
|
|
|
if c.HealthCheckFn != nil && resp.StatusCode == http.StatusPreconditionFailed {
|
2022-08-04 16:10:08 -07:00
|
|
|
err = fmt.Errorf("Marking %s offline temporarily; caused by PreconditionFailed with drive ID mismatch", c.url.Host)
|
2022-07-27 09:44:59 -07:00
|
|
|
logger.LogOnceIf(ctx, err, c.url.Host)
|
|
|
|
c.MarkOffline(err)
|
2020-06-17 14:49:26 -07:00
|
|
|
}
|
2019-02-06 12:07:03 -08:00
|
|
|
defer xhttp.DrainBody(resp.Body)
|
2018-10-04 17:44:06 -07:00
|
|
|
// Limit the ReadAll(), just in case, because of a bug, the server responds with large data.
|
2022-09-19 20:05:16 +02:00
|
|
|
b, err := io.ReadAll(io.LimitReader(resp.Body, c.MaxErrResponseSize))
|
2018-10-04 17:44:06 -07:00
|
|
|
if err != nil {
|
allow quota enforcement to rely on older values (#17351)
PUT calls cannot afford to have large latency build-ups due
to contentious usage.json, or worse letting them fail with
some unexpected error, this can happen when this file is
concurrently being updated via scanner or it is being
healed during a disk replacement heal.
However, these are fairly quick in theory, stressed clusters
can quickly show visible latency this can add up leading to
invalid errors returned during PUT.
It is perhaps okay for us to relax this error return requirement
instead, make sure that we log that we are proceeding to take in
the requests while the quota is using an older value for the quota
enforcement. These things will reconcile themselves eventually,
via scanner making sure to overwrite the usage.json.
Bonus: make sure that storage-rest-client sets ExpectTimeouts to
be 'true', such that DiskInfo() call with contextTimeout does
not prematurely disconnect the servers leading to a longer
healthCheck, back-off routine. This can easily pile up while also
causing active callers to disconnect, leading to quorum loss.
DiskInfo is actively used in the PUT, Multipart call path for
upgrading parity when disks are down, it in-turn shouldn't cause
more disks to go down.
2023-06-05 16:56:35 -07:00
|
|
|
if xnet.IsNetworkOrHostDown(err, expectTimeouts) {
|
2021-06-08 22:09:26 +01:00
|
|
|
if !c.NoMetrics {
|
2022-09-26 17:04:26 +01:00
|
|
|
atomic.AddUint64(&globalStats.errs, 1)
|
2021-06-08 22:09:26 +01:00
|
|
|
}
|
2022-07-27 09:44:59 -07:00
|
|
|
if c.MarkOffline(err) {
|
|
|
|
logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host)
|
2020-12-30 14:38:54 -08:00
|
|
|
}
|
2020-09-25 14:35:47 -07:00
|
|
|
}
|
2018-10-04 17:44:06 -07:00
|
|
|
return nil, err
|
|
|
|
}
|
2019-04-02 20:25:34 +01:00
|
|
|
if len(b) > 0 {
|
|
|
|
return nil, errors.New(string(b))
|
|
|
|
}
|
|
|
|
return nil, errors.New(resp.Status)
|
2018-10-04 17:44:06 -07:00
|
|
|
}
|
allow quota enforcement to rely on older values (#17351)
PUT calls cannot afford to have large latency build-ups due
to contentious usage.json, or worse letting them fail with
some unexpected error, this can happen when this file is
concurrently being updated via scanner or it is being
healed during a disk replacement heal.
However, these are fairly quick in theory, stressed clusters
can quickly show visible latency this can add up leading to
invalid errors returned during PUT.
It is perhaps okay for us to relax this error return requirement
instead, make sure that we log that we are proceeding to take in
the requests while the quota is using an older value for the quota
enforcement. These things will reconcile themselves eventually,
via scanner making sure to overwrite the usage.json.
Bonus: make sure that storage-rest-client sets ExpectTimeouts to
be 'true', such that DiskInfo() call with contextTimeout does
not prematurely disconnect the servers leading to a longer
healthCheck, back-off routine. This can easily pile up while also
causing active callers to disconnect, leading to quorum loss.
DiskInfo is actively used in the PUT, Multipart call path for
upgrading parity when disks are down, it in-turn shouldn't cause
more disks to go down.
2023-06-05 16:56:35 -07:00
|
|
|
if !c.NoMetrics {
|
|
|
|
resp.Body = &respBodyMonitor{ReadCloser: resp.Body, expectTimeouts: expectTimeouts}
|
2022-09-12 20:40:51 +01:00
|
|
|
}
|
2018-10-04 17:44:06 -07:00
|
|
|
return resp.Body, nil
|
|
|
|
}
|
|
|
|
|
2018-11-20 20:07:19 +01:00
|
|
|
// Close closes all idle connections of the underlying http client
|
|
|
|
func (c *Client) Close() {
|
2020-06-16 18:59:32 -07:00
|
|
|
atomic.StoreInt32(&c.connected, closed)
|
2018-11-20 20:07:19 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewClient - returns new REST client.
|
2023-07-07 22:07:30 -07:00
|
|
|
func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string) string) *Client {
|
|
|
|
connected := int32(online)
|
|
|
|
urlStr := uu.String()
|
|
|
|
u, err := url.Parse(urlStr)
|
|
|
|
if err != nil {
|
|
|
|
// Mark offline, with no reconnection attempts.
|
|
|
|
connected = int32(offline)
|
|
|
|
err = &url.Error{URL: urlStr, Err: err}
|
|
|
|
}
|
|
|
|
// The host's colon:port should be normalized. See Issue 14836.
|
|
|
|
u.Host = removeEmptyPort(u.Host)
|
|
|
|
|
2018-11-20 20:07:19 +01:00
|
|
|
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
|
|
|
|
// except custom DialContext and TLSClientConfig.
|
2018-10-04 17:44:06 -07:00
|
|
|
return &Client{
|
2023-05-02 20:35:52 +01:00
|
|
|
httpClient: &http.Client{Transport: tr},
|
2023-07-07 22:07:30 -07:00
|
|
|
url: u,
|
|
|
|
lastErr: err,
|
|
|
|
lastErrTime: time.Now(),
|
2023-05-02 20:35:52 +01:00
|
|
|
newAuthToken: newAuthToken,
|
2023-07-07 22:07:30 -07:00
|
|
|
connected: connected,
|
2023-05-02 20:35:52 +01:00
|
|
|
lastConn: time.Now().UnixNano(),
|
|
|
|
MaxErrResponseSize: 4096,
|
|
|
|
HealthCheckReconnectUnit: 200 * time.Millisecond,
|
|
|
|
HealthCheckTimeout: time.Second,
|
2020-07-11 22:19:38 -07:00
|
|
|
}
|
2018-10-04 17:44:06 -07:00
|
|
|
}
|
2020-06-16 18:59:32 -07:00
|
|
|
|
|
|
|
// IsOnline returns whether the client is likely to be online.
|
|
|
|
func (c *Client) IsOnline() bool {
|
|
|
|
return atomic.LoadInt32(&c.connected) == online
|
|
|
|
}
|
|
|
|
|
2021-05-11 17:19:15 +01:00
|
|
|
// LastConn returns when the disk was (re-)connected
|
|
|
|
func (c *Client) LastConn() time.Time {
|
|
|
|
return time.Unix(0, atomic.LoadInt64(&c.lastConn))
|
|
|
|
}
|
|
|
|
|
2022-07-27 09:44:59 -07:00
|
|
|
// LastError returns previous error
|
|
|
|
func (c *Client) LastError() error {
|
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
2023-05-31 01:07:26 +08:00
|
|
|
return fmt.Errorf("[%s] %w", c.lastErrTime.Format(time.RFC3339), c.lastErr)
|
2022-07-27 09:44:59 -07:00
|
|
|
}
|
|
|
|
|
2023-05-02 20:35:52 +01:00
|
|
|
// computes the exponential backoff duration according to
|
|
|
|
// https://www.awsarchitectureblog.com/2015/03/backoff.html
|
|
|
|
func exponentialBackoffWait(r *rand.Rand, unit, cap time.Duration) func(uint) time.Duration {
|
|
|
|
if unit > time.Hour {
|
|
|
|
// Protect against integer overflow
|
|
|
|
panic("unit cannot exceed one hour")
|
|
|
|
}
|
|
|
|
return func(attempt uint) time.Duration {
|
|
|
|
if attempt > 16 {
|
|
|
|
// Protect against integer overflow
|
|
|
|
attempt = 16
|
|
|
|
}
|
|
|
|
// sleep = random_between(unit, min(cap, base * 2 ** attempt))
|
|
|
|
sleep := unit * time.Duration(1<<attempt)
|
|
|
|
if sleep > cap {
|
|
|
|
sleep = cap
|
|
|
|
}
|
|
|
|
sleep -= time.Duration(r.Float64() * float64(sleep-unit))
|
|
|
|
return sleep
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-16 18:59:32 -07:00
|
|
|
// MarkOffline - will mark a client as being offline and spawns
|
2020-06-17 14:49:26 -07:00
|
|
|
// a goroutine that will attempt to reconnect if HealthCheckFn is set.
|
2020-12-30 14:38:54 -08:00
|
|
|
// returns true if the node changed state from online to offline
|
2022-07-27 09:44:59 -07:00
|
|
|
func (c *Client) MarkOffline(err error) bool {
|
|
|
|
c.Lock()
|
|
|
|
c.lastErr = err
|
2023-05-31 01:07:26 +08:00
|
|
|
c.lastErrTime = time.Now()
|
perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
performs a single request and returns the result. Any deadline provided on the request is
forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
// Responses from the remote server.
// Channel will be closed after an error or when the remote closes.
// All responses *must* be read by the caller until either an error is returned or the channel is closed.
// Canceling the context will cause the context cancellation error to be returned.
Responses <-chan Response
// Requests sent to the server.
// If the handler is defined with 0 incoming capacity this will be nil.
// Channel *must* be closed to signal the end of the stream.
// If the request context is canceled, the stream will no longer process requests.
Requests chan<- []byte
}
type Response struct {
Msg []byte
Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 17:09:35 -08:00
|
|
|
atomic.StoreInt64(&c.lastConn, time.Now().UnixNano())
|
2022-07-27 09:44:59 -07:00
|
|
|
c.Unlock()
|
2020-06-16 18:59:32 -07:00
|
|
|
// Start goroutine that will attempt to reconnect.
|
|
|
|
// If server is already trying to reconnect this will have no effect.
|
2020-06-17 14:49:26 -07:00
|
|
|
if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) {
|
2020-10-13 18:28:42 -07:00
|
|
|
go func() {
|
2023-05-02 20:35:52 +01:00
|
|
|
backOff := exponentialBackoffWait(
|
|
|
|
rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
|
|
200*time.Millisecond,
|
|
|
|
30*time.Second,
|
|
|
|
)
|
|
|
|
|
|
|
|
attempt := uint(0)
|
2020-10-13 18:28:42 -07:00
|
|
|
for {
|
2020-09-16 21:14:35 -07:00
|
|
|
if atomic.LoadInt32(&c.connected) == closed {
|
2020-06-16 18:59:32 -07:00
|
|
|
return
|
|
|
|
}
|
2020-12-08 09:23:35 -08:00
|
|
|
if c.HealthCheckFn() {
|
2020-12-30 14:38:54 -08:00
|
|
|
if atomic.CompareAndSwapInt32(&c.connected, offline, online) {
|
2022-07-15 14:41:24 -07:00
|
|
|
now := time.Now()
|
|
|
|
disconnected := now.Sub(c.LastConn())
|
|
|
|
logger.Info("Client '%s' re-connected in %s", c.url.String(), disconnected)
|
|
|
|
atomic.StoreInt64(&c.lastConn, now.UnixNano())
|
2020-12-30 14:38:54 -08:00
|
|
|
}
|
2020-12-08 09:23:35 -08:00
|
|
|
return
|
2020-06-16 18:59:32 -07:00
|
|
|
}
|
2023-05-02 20:35:52 +01:00
|
|
|
attempt++
|
|
|
|
time.Sleep(backOff(attempt))
|
2020-06-16 18:59:32 -07:00
|
|
|
}
|
2020-10-13 18:28:42 -07:00
|
|
|
}()
|
2020-12-30 14:38:54 -08:00
|
|
|
return true
|
2020-06-16 18:59:32 -07:00
|
|
|
}
|
2020-12-30 14:38:54 -08:00
|
|
|
return false
|
2020-06-16 18:59:32 -07:00
|
|
|
}
|