// Copyright (c) 2015-2021 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package rest import ( "bytes" "context" "errors" "fmt" "io" "math/rand" "net/http" "net/http/httputil" "net/url" "path" "strings" "sync" "sync/atomic" "time" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/mcontext" xnet "github.com/minio/pkg/v2/net" ) // DefaultTimeout - default REST timeout is 10 seconds. const DefaultTimeout = 10 * time.Second const ( offline = iota online closed ) // NetworkError - error type in case of errors related to http/transport // for ex. connection refused, connection reset, dns resolution failure etc. // All errors returned by storage-rest-server (ex errFileNotFound, errDiskNotFound) are not considered to be network errors. type NetworkError struct { Err error } func (n *NetworkError) Error() string { return n.Err.Error() } // Unwrap returns the error wrapped in NetworkError. func (n *NetworkError) Unwrap() error { return n.Err } // Client - http based RPC client. type Client struct { connected int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG _ int32 // For 64 bits alignment lastConn int64 // HealthCheckFn is the function set to test for health. // If not set the client will not keep track of health. // Calling this returns true or false if the target // is online or offline. HealthCheckFn func() bool // HealthCheckRetryUnit will be used to calculate the exponential // backoff when trying to reconnect to an offline node HealthCheckReconnectUnit time.Duration // HealthCheckTimeout determines timeout for each call. HealthCheckTimeout time.Duration // MaxErrResponseSize is the maximum expected response size. // Should only be modified before any calls are made. MaxErrResponseSize int64 // Avoid metrics update if set to true NoMetrics bool // TraceOutput will print debug information on non-200 calls if set. TraceOutput io.Writer // Debug trace output httpClient *http.Client url *url.URL newAuthToken func(audience string) string sync.RWMutex // mutex for lastErr lastErr error lastErrTime time.Time } type restError string func (e restError) Error() string { return string(e) } func (e restError) Timeout() bool { return true } // Given a string of the form "host", "host:port", or "[ipv6::address]:port", // return true if the string includes a port. func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") } // removeEmptyPort strips the empty port in ":port" to "" // as mandated by RFC 3986 Section 6.2.3. func removeEmptyPort(host string) string { if hasPort(host) { return strings.TrimSuffix(host, ":") } return host } // Copied from http.NewRequest but implemented to ensure we reuse `url.URL` instance. func (c *Client) newRequest(ctx context.Context, u url.URL, body io.Reader) (*http.Request, error) { rc, ok := body.(io.ReadCloser) if !ok && body != nil { rc = io.NopCloser(body) } req := &http.Request{ Method: http.MethodPost, URL: &u, Proto: "HTTP/1.1", ProtoMajor: 1, ProtoMinor: 1, Header: make(http.Header), Body: rc, Host: u.Host, } req = req.WithContext(ctx) if body != nil { switch v := body.(type) { case *bytes.Buffer: req.ContentLength = int64(v.Len()) buf := v.Bytes() req.GetBody = func() (io.ReadCloser, error) { r := bytes.NewReader(buf) return io.NopCloser(r), nil } case *bytes.Reader: req.ContentLength = int64(v.Len()) snapshot := *v req.GetBody = func() (io.ReadCloser, error) { r := snapshot return io.NopCloser(&r), nil } case *strings.Reader: req.ContentLength = int64(v.Len()) snapshot := *v req.GetBody = func() (io.ReadCloser, error) { r := snapshot return io.NopCloser(&r), nil } default: // This is where we'd set it to -1 (at least // if body != NoBody) to mean unknown, but // that broke people during the Go 1.8 testing // period. People depend on it being 0 I // guess. Maybe retry later. See Issue 18117. } // For client requests, Request.ContentLength of 0 // means either actually 0, or unknown. The only way // to explicitly say that the ContentLength is zero is // to set the Body to nil. But turns out too much code // depends on NewRequest returning a non-nil Body, // so we use a well-known ReadCloser variable instead // and have the http package also treat that sentinel // variable to mean explicitly zero. if req.GetBody != nil && req.ContentLength == 0 { req.Body = http.NoBody req.GetBody = func() (io.ReadCloser, error) { return http.NoBody, nil } } } if c.newAuthToken != nil { req.Header.Set("Authorization", "Bearer "+c.newAuthToken(u.RawQuery)) } req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339)) if tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt); ok { req.Header.Set(xhttp.AmzRequestID, tc.AmzReqID) } return req, nil } type respBodyMonitor struct { io.ReadCloser expectTimeouts bool errorStatusOnce sync.Once } func (r *respBodyMonitor) Read(p []byte) (n int, err error) { n, err = r.ReadCloser.Read(p) r.errorStatus(err) return } func (r *respBodyMonitor) Close() (err error) { err = r.ReadCloser.Close() r.errorStatus(err) return } func (r *respBodyMonitor) errorStatus(err error) { if xnet.IsNetworkOrHostDown(err, r.expectTimeouts) { r.errorStatusOnce.Do(func() { atomic.AddUint64(&globalStats.errs, 1) }) } } // dumpHTTP - dump HTTP request and response. func (c *Client) dumpHTTP(req *http.Request, resp *http.Response) { // Starts http dump. _, err := fmt.Fprintln(c.TraceOutput, "---------START-HTTP---------") if err != nil { return } // Filter out Signature field from Authorization header. origAuth := req.Header.Get("Authorization") if origAuth != "" { req.Header.Set("Authorization", "**REDACTED**") } // Only display request header. reqTrace, err := httputil.DumpRequestOut(req, false) if err != nil { return } // Write request to trace output. _, err = fmt.Fprint(c.TraceOutput, string(reqTrace)) if err != nil { return } // Only display response header. var respTrace []byte // For errors we make sure to dump response body as well. if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusNoContent { respTrace, err = httputil.DumpResponse(resp, true) if err != nil { return } } else { respTrace, err = httputil.DumpResponse(resp, false) if err != nil { return } } // Write response to trace output. _, err = fmt.Fprint(c.TraceOutput, strings.TrimSuffix(string(respTrace), "\r\n")) if err != nil { return } // Ends the http dump. _, err = fmt.Fprintln(c.TraceOutput, "---------END-HTTP---------") if err != nil { return } // Returns success. return } // Call - make a REST call with context. func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { if !c.IsOnline() { return nil, &NetworkError{Err: c.LastError()} } // Shallow copy. We don't modify the *UserInfo, if set. // All other fields are copied. u := *c.url u.Path = path.Join(u.Path, method) u.RawQuery = values.Encode() req, err := c.newRequest(ctx, u, body) if err != nil { return nil, &NetworkError{Err: err} } if length > 0 { req.ContentLength = length } _, expectTimeouts := ctx.Deadline() req, update := setupReqStatsUpdate(req) defer update() resp, err := c.httpClient.Do(req) if err != nil { if xnet.IsNetworkOrHostDown(err, expectTimeouts) { if !c.NoMetrics { atomic.AddUint64(&globalStats.errs, 1) } if c.MarkOffline(err) { logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host) } } return nil, &NetworkError{err} } // If trace is enabled, dump http request and response, // except when the traceErrorsOnly enabled and the response's status code is ok if c.TraceOutput != nil && resp.StatusCode != http.StatusOK { c.dumpHTTP(req, resp) } if resp.StatusCode != http.StatusOK { // If server returns 412 pre-condition failed, it would // mean that authentication succeeded, but another // side-channel check has failed, we shall take // the client offline in such situations. // generally all implementations should simply return // 403, but in situations where there is a dependency // with the caller to take the client offline purpose // fully it should make sure to respond with '412' // instead, see cmd/storage-rest-server.go for ideas. if c.HealthCheckFn != nil && resp.StatusCode == http.StatusPreconditionFailed { err = fmt.Errorf("Marking %s offline temporarily; caused by PreconditionFailed with drive ID mismatch", c.url.Host) logger.LogOnceIf(ctx, err, c.url.Host) c.MarkOffline(err) } defer xhttp.DrainBody(resp.Body) // Limit the ReadAll(), just in case, because of a bug, the server responds with large data. b, err := io.ReadAll(io.LimitReader(resp.Body, c.MaxErrResponseSize)) if err != nil { if xnet.IsNetworkOrHostDown(err, expectTimeouts) { if !c.NoMetrics { atomic.AddUint64(&globalStats.errs, 1) } if c.MarkOffline(err) { logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host) } } return nil, err } if len(b) > 0 { return nil, errors.New(string(b)) } return nil, errors.New(resp.Status) } if !c.NoMetrics { resp.Body = &respBodyMonitor{ReadCloser: resp.Body, expectTimeouts: expectTimeouts} } return resp.Body, nil } // Close closes all idle connections of the underlying http client func (c *Client) Close() { atomic.StoreInt32(&c.connected, closed) } // NewClient - returns new REST client. func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string) string) *Client { connected := int32(online) urlStr := uu.String() u, err := url.Parse(urlStr) if err != nil { // Mark offline, with no reconnection attempts. connected = int32(offline) err = &url.Error{URL: urlStr, Err: err} } // The host's colon:port should be normalized. See Issue 14836. u.Host = removeEmptyPort(u.Host) // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper // except custom DialContext and TLSClientConfig. return &Client{ httpClient: &http.Client{Transport: tr}, url: u, lastErr: err, lastErrTime: time.Now(), newAuthToken: newAuthToken, connected: connected, lastConn: time.Now().UnixNano(), MaxErrResponseSize: 4096, HealthCheckReconnectUnit: 200 * time.Millisecond, HealthCheckTimeout: time.Second, } } // IsOnline returns whether the client is likely to be online. func (c *Client) IsOnline() bool { return atomic.LoadInt32(&c.connected) == online } // LastConn returns when the disk was (re-)connected func (c *Client) LastConn() time.Time { return time.Unix(0, atomic.LoadInt64(&c.lastConn)) } // LastError returns previous error func (c *Client) LastError() error { c.RLock() defer c.RUnlock() return fmt.Errorf("[%s] %w", c.lastErrTime.Format(time.RFC3339), c.lastErr) } // computes the exponential backoff duration according to // https://www.awsarchitectureblog.com/2015/03/backoff.html func exponentialBackoffWait(r *rand.Rand, unit, cap time.Duration) func(uint) time.Duration { if unit > time.Hour { // Protect against integer overflow panic("unit cannot exceed one hour") } return func(attempt uint) time.Duration { if attempt > 16 { // Protect against integer overflow attempt = 16 } // sleep = random_between(unit, min(cap, base * 2 ** attempt)) sleep := unit * time.Duration(1< cap { sleep = cap } sleep -= time.Duration(r.Float64() * float64(sleep-unit)) return sleep } } // MarkOffline - will mark a client as being offline and spawns // a goroutine that will attempt to reconnect if HealthCheckFn is set. // returns true if the node changed state from online to offline func (c *Client) MarkOffline(err error) bool { c.Lock() c.lastErr = err c.lastErrTime = time.Now() atomic.StoreInt64(&c.lastConn, time.Now().UnixNano()) c.Unlock() // Start goroutine that will attempt to reconnect. // If server is already trying to reconnect this will have no effect. if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) { go func() { backOff := exponentialBackoffWait( rand.New(rand.NewSource(time.Now().UnixNano())), 200*time.Millisecond, 30*time.Second, ) attempt := uint(0) for { if atomic.LoadInt32(&c.connected) == closed { return } if c.HealthCheckFn() { if atomic.CompareAndSwapInt32(&c.connected, offline, online) { now := time.Now() disconnected := now.Sub(c.LastConn()) logger.Event(context.Background(), "Client '%s' re-connected in %s", c.url.String(), disconnected) atomic.StoreInt64(&c.lastConn, now.UnixNano()) } return } attempt++ time.Sleep(backOff(attempt)) } }() return true } return false }