mirror of
https://github.com/minio/minio.git
synced 2025-04-04 11:50:36 -04:00
fix: avoid timed value for network calls (#11531)
additionally simply timedValue to have RWMutex to avoid concurrent calls to DiskInfo() getting serialized, this has an effect on all calls that use GetDiskInfo() on the same disks. Such as getOnlineDisks, getOnlineDisksWithoutHealing
This commit is contained in:
parent
928de04f7a
commit
79b6a43467
@ -18,6 +18,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -295,7 +296,10 @@ func validateConfig(s config.Config, setDriveCounts []int) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport())
|
kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), newCustomHTTPTransportWithHTTP2(
|
||||||
|
&tls.Config{
|
||||||
|
RootCAs: globalRootCAs,
|
||||||
|
}, defaultDialTimeout)())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -471,7 +475,10 @@ func lookupConfigs(s config.Config, setDriveCounts []int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport())
|
kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), newCustomHTTPTransportWithHTTP2(
|
||||||
|
&tls.Config{
|
||||||
|
RootCAs: globalRootCAs,
|
||||||
|
}, defaultDialTimeout)())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to setup KMS config: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to setup KMS config: %w", err))
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/minio/minio/cmd/http"
|
"github.com/minio/minio/cmd/http"
|
||||||
xhttp "github.com/minio/minio/cmd/http"
|
xhttp "github.com/minio/minio/cmd/http"
|
||||||
@ -120,8 +119,6 @@ type storageRESTClient struct {
|
|||||||
endpoint Endpoint
|
endpoint Endpoint
|
||||||
restClient *rest.Client
|
restClient *rest.Client
|
||||||
diskID string
|
diskID string
|
||||||
|
|
||||||
diskInfoCache timedValue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected
|
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected
|
||||||
@ -218,27 +215,18 @@ func (client *storageRESTClient) SetDiskID(id string) {
|
|||||||
|
|
||||||
// DiskInfo - fetch disk information for a remote disk.
|
// DiskInfo - fetch disk information for a remote disk.
|
||||||
func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
|
func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
|
||||||
client.diskInfoCache.Once.Do(func() {
|
respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1)
|
||||||
client.diskInfoCache.TTL = time.Second
|
if err != nil {
|
||||||
client.diskInfoCache.Update = func() (interface{}, error) {
|
return info, err
|
||||||
var info DiskInfo
|
}
|
||||||
respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1)
|
defer http.DrainBody(respBody)
|
||||||
if err != nil {
|
if err = msgp.Decode(respBody, &info); err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
if info.Error != "" {
|
||||||
if err = msgp.Decode(respBody, &info); err != nil {
|
return info, toStorageErr(errors.New(info.Error))
|
||||||
return info, err
|
}
|
||||||
}
|
return info, nil
|
||||||
if info.Error != "" {
|
|
||||||
return info, toStorageErr(errors.New(info.Error))
|
|
||||||
}
|
|
||||||
return info, nil
|
|
||||||
}
|
|
||||||
})
|
|
||||||
v, err := client.diskInfoCache.Get()
|
|
||||||
info = v.(DiskInfo)
|
|
||||||
return info, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakeVolBulk - create multiple volumes in a bulk operation.
|
// MakeVolBulk - create multiple volumes in a bulk operation.
|
||||||
|
83
cmd/utils.go
83
cmd/utils.go
@ -45,6 +45,7 @@ import (
|
|||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/minio/minio/pkg/handlers"
|
"github.com/minio/minio/pkg/handlers"
|
||||||
"github.com/minio/minio/pkg/madmin"
|
"github.com/minio/minio/pkg/madmin"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -523,6 +524,45 @@ func newCustomHTTPProxyTransport(tlsConfig *tls.Config, dialTimeout time.Duratio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newCustomHTTPTransportWithHTTP2(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
|
||||||
|
// For more details about various values used here refer
|
||||||
|
// https://golang.org/pkg/net/http/#Transport documentation
|
||||||
|
tr := &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),
|
||||||
|
MaxIdleConnsPerHost: 1024,
|
||||||
|
IdleConnTimeout: 15 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 1 * time.Minute,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
ExpectContinueTimeout: 10 * time.Second,
|
||||||
|
TLSClientConfig: tlsConfig,
|
||||||
|
// Go net/http automatically unzip if content-type is
|
||||||
|
// gzip disable this feature, as we are always interested
|
||||||
|
// in raw stream.
|
||||||
|
DisableCompression: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
if tlsConfig != nil {
|
||||||
|
trhttp2, _ := http2.ConfigureTransports(tr)
|
||||||
|
if trhttp2 != nil {
|
||||||
|
// ReadIdleTimeout is the timeout after which a health check using ping
|
||||||
|
// frame will be carried out if no frame is received on the
|
||||||
|
// connection. 5 minutes is sufficient time for any idle connection.
|
||||||
|
trhttp2.ReadIdleTimeout = 5 * time.Minute
|
||||||
|
// PingTimeout is the timeout after which the connection will be closed
|
||||||
|
// if a response to Ping is not received.
|
||||||
|
trhttp2.PingTimeout = dialTimeout
|
||||||
|
// DisableCompression, if true, prevents the Transport from
|
||||||
|
// requesting compression with an "Accept-Encoding: gzip"
|
||||||
|
trhttp2.DisableCompression = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func() *http.Transport {
|
||||||
|
return tr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
|
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
|
||||||
// For more details about various values used here refer
|
// For more details about various values used here refer
|
||||||
// https://golang.org/pkg/net/http/#Transport documentation
|
// https://golang.org/pkg/net/http/#Transport documentation
|
||||||
@ -767,38 +807,45 @@ type timedValue struct {
|
|||||||
// Managed values.
|
// Managed values.
|
||||||
value interface{}
|
value interface{}
|
||||||
lastUpdate time.Time
|
lastUpdate time.Time
|
||||||
mu sync.Mutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get will return a cached value or fetch a new one.
|
// Get will return a cached value or fetch a new one.
|
||||||
// If the Update function returns an error the value is forwarded as is and not cached.
|
// If the Update function returns an error the value is forwarded as is and not cached.
|
||||||
func (t *timedValue) Get() (interface{}, error) {
|
func (t *timedValue) Get() (interface{}, error) {
|
||||||
t.mu.Lock()
|
v := t.get()
|
||||||
defer t.mu.Unlock()
|
if v != nil {
|
||||||
if t.TTL <= 0 {
|
return v, nil
|
||||||
t.TTL = time.Second
|
|
||||||
}
|
|
||||||
if t.value != nil {
|
|
||||||
if time.Since(t.lastUpdate) < t.TTL {
|
|
||||||
v := t.value
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
t.value = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := t.Update()
|
v, err := t.Update()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
t.value = v
|
|
||||||
t.lastUpdate = time.Now()
|
t.update(v)
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invalidate the value in the cache.
|
func (t *timedValue) get() (v interface{}) {
|
||||||
func (t *timedValue) Invalidate() {
|
ttl := t.TTL
|
||||||
|
if ttl <= 0 {
|
||||||
|
ttl = time.Second
|
||||||
|
}
|
||||||
|
t.mu.RLock()
|
||||||
|
defer t.mu.RUnlock()
|
||||||
|
v = t.value
|
||||||
|
if time.Since(t.lastUpdate) < ttl {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *timedValue) update(v interface{}) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
t.value = nil
|
defer t.mu.Unlock()
|
||||||
t.mu.Unlock()
|
t.value = v
|
||||||
|
t.lastUpdate = time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
// On MinIO a directory object is stored as a regular object with "__XLDIR__" suffix.
|
// On MinIO a directory object is stored as a regular object with "__XLDIR__" suffix.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user