Add sufficient deadlines and countermeasures to handle hung node scenario (#19688)

Signed-off-by: Shubhendu Ram Tripathi <shubhendu@minio.io>
Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Shubhendu 2024-05-23 04:37:14 +05:30 committed by GitHub
parent ca80eced24
commit 7c7650b7c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 292 additions and 133 deletions

View File

@ -4,7 +4,6 @@ on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -3,8 +3,7 @@ name: FIPS Build Test
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -3,8 +3,7 @@ name: Healing Functional Tests
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -3,12 +3,11 @@ name: Linters and Tests
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.
concurrency: concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }} group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true cancel-in-progress: true
@ -35,9 +34,10 @@ jobs:
CGO_ENABLED: 0 CGO_ENABLED: 0
GO111MODULE: on GO111MODULE: on
run: | run: |
Set-MpPreference -DisableRealtimeMonitoring $true
netsh int ipv4 set dynamicport tcp start=60000 num=61000 netsh int ipv4 set dynamicport tcp start=60000 num=61000
go build --ldflags="-s -w" -o %GOPATH%\bin\minio.exe go build --ldflags="-s -w" -o %GOPATH%\bin\minio.exe
go test -v --timeout 50m ./... go test -v --timeout 120m ./...
- name: Build on ${{ matrix.os }} - name: Build on ${{ matrix.os }}
if: matrix.os == 'ubuntu-latest' if: matrix.os == 'ubuntu-latest'
env: env:

View File

@ -3,8 +3,7 @@ name: Functional Tests
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -3,8 +3,7 @@ name: Helm Chart linting
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -4,7 +4,6 @@ on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -4,7 +4,6 @@ on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.
@ -56,6 +55,10 @@ jobs:
run: | run: |
${GITHUB_WORKSPACE}/.github/workflows/run-mint.sh "erasure" "minio" "minio123" "${{ steps.vars.outputs.sha_short }}" ${GITHUB_WORKSPACE}/.github/workflows/run-mint.sh "erasure" "minio" "minio123" "${{ steps.vars.outputs.sha_short }}"
- name: resiliency
run: |
${GITHUB_WORKSPACE}/.github/workflows/run-mint.sh "resiliency" "minio" "minio123" "${{ steps.vars.outputs.sha_short }}"
- name: The job must cleanup - name: The job must cleanup
if: ${{ always() }} if: ${{ always() }}
run: | run: |

View File

@ -0,0 +1,78 @@
version: '3.7'
# Settings and configurations that are common for all containers
x-minio-common: &minio-common
image: quay.io/minio/minio:${JOB_NAME}
command: server --console-address ":9001" http://minio{1...4}/rdata{1...2}
expose:
- "9000"
- "9001"
environment:
MINIO_CI_CD: "on"
MINIO_ROOT_USER: "minio"
MINIO_ROOT_PASSWORD: "minio123"
MINIO_KMS_SECRET_KEY: "my-minio-key:OSMM+vkKUTCvQs9YL/CVMIMt43HFhkUpqJxTmGl6rYw="
MINIO_DRIVE_MAX_TIMEOUT: "5s"
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 5s
retries: 5
# starts 4 docker containers running minio server instances.
# using nginx reverse proxy, load balancing, you can access
# it through port 9000.
services:
minio1:
<<: *minio-common
hostname: minio1
volumes:
- rdata1-1:/rdata1
- rdata1-2:/rdata2
minio2:
<<: *minio-common
hostname: minio2
volumes:
- rdata2-1:/rdata1
- rdata2-2:/rdata2
minio3:
<<: *minio-common
hostname: minio3
volumes:
- rdata3-1:/rdata1
- rdata3-2:/rdata2
minio4:
<<: *minio-common
hostname: minio4
volumes:
- rdata4-1:/rdata1
- rdata4-2:/rdata2
nginx:
image: nginx:1.19.2-alpine
hostname: nginx
volumes:
- ./nginx-4-node.conf:/etc/nginx/nginx.conf:ro
ports:
- "9000:9000"
- "9001:9001"
depends_on:
- minio1
- minio2
- minio3
- minio4
## By default this config uses default local driver,
## For custom volumes replace with volume driver configuration.
volumes:
rdata1-1:
rdata1-2:
rdata2-1:
rdata2-2:
rdata3-1:
rdata3-2:
rdata4-1:
rdata4-2:

View File

@ -3,8 +3,7 @@ name: MinIO advanced tests
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -3,8 +3,7 @@ name: Root lockdown tests
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -16,7 +16,7 @@ docker volume rm $(docker volume ls -f dangling=true) || true
cd .github/workflows/mint cd .github/workflows/mint
docker-compose -f minio-${MODE}.yaml up -d docker-compose -f minio-${MODE}.yaml up -d
sleep 30s sleep 1m
docker system prune -f || true docker system prune -f || true
docker volume prune -f || true docker volume prune -f || true
@ -26,6 +26,9 @@ docker volume rm $(docker volume ls -q -f dangling=true) || true
[ "${MODE}" == "pools" ] && docker-compose -f minio-${MODE}.yaml stop minio2 [ "${MODE}" == "pools" ] && docker-compose -f minio-${MODE}.yaml stop minio2
[ "${MODE}" == "pools" ] && docker-compose -f minio-${MODE}.yaml stop minio6 [ "${MODE}" == "pools" ] && docker-compose -f minio-${MODE}.yaml stop minio6
# Pause one node, to check that all S3 calls work while one node goes wrong
[ "${MODE}" == "resiliency" ] && docker-compose -f minio-${MODE}.yaml pause minio4
docker run --rm --net=mint_default \ docker run --rm --net=mint_default \
--name="mint-${MODE}-${JOB_NAME}" \ --name="mint-${MODE}-${JOB_NAME}" \
-e SERVER_ENDPOINT="nginx:9000" \ -e SERVER_ENDPOINT="nginx:9000" \
@ -35,6 +38,18 @@ docker run --rm --net=mint_default \
-e MINT_MODE="${MINT_MODE}" \ -e MINT_MODE="${MINT_MODE}" \
docker.io/minio/mint:edge docker.io/minio/mint:edge
# FIXME: enable this after fixing aws-sdk-java-v2 tests
# # unpause the node, to check that all S3 calls work while one node goes wrong
# [ "${MODE}" == "resiliency" ] && docker-compose -f minio-${MODE}.yaml unpause minio4
# [ "${MODE}" == "resiliency" ] && docker run --rm --net=mint_default \
# --name="mint-${MODE}-${JOB_NAME}" \
# -e SERVER_ENDPOINT="nginx:9000" \
# -e ACCESS_KEY="${ACCESS_KEY}" \
# -e SECRET_KEY="${SECRET_KEY}" \
# -e ENABLE_HTTPS=0 \
# -e MINT_MODE="${MINT_MODE}" \
# docker.io/minio/mint:edge
docker-compose -f minio-${MODE}.yaml down || true docker-compose -f minio-${MODE}.yaml down || true
sleep 10s sleep 10s

View File

@ -3,8 +3,7 @@ name: Shell formatting checks
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
permissions: permissions:
contents: read contents: read

View File

@ -3,8 +3,7 @@ name: Upgrade old version tests
on: on:
pull_request: pull_request:
branches: branches:
- master - master
- next
# This ensures that previous jobs for the PR are canceled when the PR is # This ensures that previous jobs for the PR are canceled when the PR is
# updated. # updated.

View File

@ -57,7 +57,7 @@ done
set +e set +e
sleep 10 ./mc ready minioadm/
./mc ls minioadm/ ./mc ls minioadm/
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc. // Copyright (c) 2015-2024 MinIO, Inc.
// //
// This file is part of MinIO Object Storage stack // This file is part of MinIO Object Storage stack
// //
@ -18,7 +18,6 @@
package cmd package cmd
import ( import (
"context"
"math" "math"
"net/http" "net/http"
"os" "os"
@ -31,6 +30,7 @@ import (
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/kms"
xnet "github.com/minio/pkg/v2/net"
) )
// getLocalServerProperty - returns madmin.ServerProperties for only the // getLocalServerProperty - returns madmin.ServerProperties for only the
@ -64,9 +64,11 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req
if err := isServerResolvable(endpoint, 5*time.Second); err == nil { if err := isServerResolvable(endpoint, 5*time.Second); err == nil {
network[nodeName] = string(madmin.ItemOnline) network[nodeName] = string(madmin.ItemOnline)
} else { } else {
network[nodeName] = string(madmin.ItemOffline) if xnet.IsNetworkOrHostDown(err, false) {
// log once the error network[nodeName] = string(madmin.ItemOffline)
peersLogOnceIf(context.Background(), err, nodeName) } else if xnet.IsNetworkOrHostDown(err, true) {
network[nodeName] = "connection attempt timedout"
}
} }
} }
} }

View File

@ -404,16 +404,13 @@ func buildServerCtxt(ctx *cli.Context, ctxt *serverCtxt) (err error) {
ctxt.FTP = ctx.StringSlice("ftp") ctxt.FTP = ctx.StringSlice("ftp")
ctxt.SFTP = ctx.StringSlice("sftp") ctxt.SFTP = ctx.StringSlice("sftp")
ctxt.Interface = ctx.String("interface") ctxt.Interface = ctx.String("interface")
ctxt.UserTimeout = ctx.Duration("conn-user-timeout") ctxt.UserTimeout = ctx.Duration("conn-user-timeout")
ctxt.SendBufSize = ctx.Int("send-buf-size") ctxt.SendBufSize = ctx.Int("send-buf-size")
ctxt.RecvBufSize = ctx.Int("recv-buf-size") ctxt.RecvBufSize = ctx.Int("recv-buf-size")
ctxt.ShutdownTimeout = ctx.Duration("shutdown-timeout")
ctxt.IdleTimeout = ctx.Duration("idle-timeout") ctxt.IdleTimeout = ctx.Duration("idle-timeout")
ctxt.ReadHeaderTimeout = ctx.Duration("read-header-timeout") ctxt.UserTimeout = ctx.Duration("conn-user-timeout")
ctxt.MaxIdleConnsPerHost = ctx.Int("max-idle-conns-per-host") ctxt.ShutdownTimeout = ctx.Duration("shutdown-timeout")
if conf := ctx.String("config"); len(conf) > 0 { if conf := ctx.String("config"); len(conf) > 0 {
err = mergeServerCtxtFromConfigFile(conf, ctxt) err = mergeServerCtxtFromConfigFile(conf, ctxt)

View File

@ -258,6 +258,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
healTrace(healingMetricObject, startTime, bucket, object, &opts, err, &result) healTrace(healingMetricObject, startTime, bucket, object, &opts, err, &result)
}() }()
} }
// Initialize heal result object // Initialize heal result object
result = madmin.HealResultItem{ result = madmin.HealResultItem{
Type: madmin.HealItemObject, Type: madmin.HealItemObject,

View File

@ -657,7 +657,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
if err != nil { if err != nil {
return PartInfo{}, err return PartInfo{}, err
} }
pctx := plkctx.Context()
ctx = plkctx.Context()
defer partIDLock.Unlock(plkctx) defer partIDLock.Unlock(plkctx)
onlineDisks := er.getDisks() onlineDisks := er.getDisks()
@ -689,7 +690,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
} }
}() }()
erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
if err != nil { if err != nil {
return pi, toObjectErr(err, bucket, object) return pi, toObjectErr(err, bucket, object)
} }
@ -742,7 +743,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
} }
} }
n, err := erasure.Encode(pctx, toEncode, writers, buffer, writeQuorum) n, err := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
closeBitrotWriters(writers) closeBitrotWriters(writers)
if err != nil { if err != nil {
return pi, toObjectErr(err, bucket, object) return pi, toObjectErr(err, bucket, object)

View File

@ -2472,16 +2472,10 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
for _, disk := range storageInfo.Disks { for _, disk := range storageInfo.Disks {
if opts.Maintenance { if opts.Maintenance {
var skip bool
globalLocalDrivesMu.RLock() globalLocalDrivesMu.RLock()
for _, drive := range globalLocalDrives { _, ok := globalLocalDrivesMap[disk.Endpoint]
if drive != nil && drive.Endpoint().String() == disk.Endpoint {
skip = true
break
}
}
globalLocalDrivesMu.RUnlock() globalLocalDrivesMu.RUnlock()
if skip { if ok {
continue continue
} }
} }

View File

@ -414,8 +414,9 @@ var (
// List of local drives to this node, this is only set during server startup, // List of local drives to this node, this is only set during server startup,
// and is only mutated by HealFormat. Hold globalLocalDrivesMu to access. // and is only mutated by HealFormat. Hold globalLocalDrivesMu to access.
globalLocalDrives []StorageAPI globalLocalDrives []StorageAPI
globalLocalDrivesMu sync.RWMutex globalLocalDrivesMap = make(map[string]StorageAPI)
globalLocalDrivesMu sync.RWMutex
globalDriveMonitoring = env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn globalDriveMonitoring = env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn

View File

@ -36,8 +36,12 @@ var globalGridStart = make(chan struct{})
func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error { func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error {
hosts, local := eps.GridHosts() hosts, local := eps.GridHosts()
lookupHost := globalDNSCache.LookupHost
g, err := grid.NewManager(ctx, grid.ManagerOptions{ g, err := grid.NewManager(ctx, grid.ManagerOptions{
Dialer: grid.ContextDialer(xhttp.DialContextWithLookupHost(globalDNSCache.LookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions))), // Pass Dialer for websocket grid, make sure we do not
// provide any DriveOPTimeout() function, as that is not
// useful over persistent connections.
Dialer: grid.ContextDialer(xhttp.DialContextWithLookupHost(lookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions.ForWebsocket()))),
Local: local, Local: local,
Hosts: hosts, Hosts: hosts,
AddAuth: newCachedAuthToken(), AddAuth: newCachedAuthToken(),

View File

@ -320,6 +320,9 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc
} }
func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) { func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
bi, err := listBucketsRPC.Call(ctx, client.gridConn(), &opts) bi, err := listBucketsRPC.Call(ctx, client.gridConn(), &opts)
if err != nil { if err != nil {
return nil, toStorageErr(err) return nil, toStorageErr(err)
@ -345,6 +348,9 @@ func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string,
peerS3BucketDeleted: strconv.FormatBool(opts.Remove), peerS3BucketDeleted: strconv.FormatBool(opts.Remove),
}) })
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
_, err := healBucketRPC.Call(ctx, conn, mss) _, err := healBucketRPC.Call(ctx, conn, mss)
// Initialize heal result info // Initialize heal result info
@ -367,6 +373,9 @@ func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket stri
peerS3BucketDeleted: strconv.FormatBool(opts.Deleted), peerS3BucketDeleted: strconv.FormatBool(opts.Deleted),
}) })
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
volInfo, err := headBucketRPC.Call(ctx, conn, mss) volInfo, err := headBucketRPC.Call(ctx, conn, mss)
if err != nil { if err != nil {
return BucketInfo{}, toStorageErr(err) return BucketInfo{}, toStorageErr(err)
@ -418,6 +427,9 @@ func (client *remotePeerS3Client) MakeBucket(ctx context.Context, bucket string,
peerS3BucketForceCreate: strconv.FormatBool(opts.ForceCreate), peerS3BucketForceCreate: strconv.FormatBool(opts.ForceCreate),
}) })
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
_, err := makeBucketRPC.Call(ctx, conn, mss) _, err := makeBucketRPC.Call(ctx, conn, mss)
return toStorageErr(err) return toStorageErr(err)
} }
@ -467,6 +479,9 @@ func (client *remotePeerS3Client) DeleteBucket(ctx context.Context, bucket strin
peerS3BucketForceDelete: strconv.FormatBool(opts.Force), peerS3BucketForceDelete: strconv.FormatBool(opts.Force),
}) })
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
_, err := deleteBucketRPC.Call(ctx, conn, mss) _, err := deleteBucketRPC.Call(ctx, conn, mss)
return toStorageErr(err) return toStorageErr(err)
} }

View File

@ -234,7 +234,7 @@ func testPostPolicyBucketHandler(obj ObjectLayer, instanceType string, t TestErr
// Call the ServeHTTP to execute the handler. // Call the ServeHTTP to execute the handler.
apiRouter.ServeHTTP(rec, req) apiRouter.ServeHTTP(rec, req)
if rec.Code != test.expectedStatus { if rec.Code != test.expectedStatus {
t.Fatalf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, test.expectedStatus, rec.Code) t.Fatalf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`, Resp: %s", i+1, instanceType, test.expectedStatus, rec.Code, rec.Body)
} }
} }

View File

@ -414,10 +414,11 @@ func serverHandleCmdArgs(ctxt serverCtxt) {
setGlobalInternodeInterface(ctxt.Interface) setGlobalInternodeInterface(ctxt.Interface)
globalTCPOptions = xhttp.TCPOptions{ globalTCPOptions = xhttp.TCPOptions{
UserTimeout: int(ctxt.UserTimeout.Milliseconds()), UserTimeout: int(ctxt.UserTimeout.Milliseconds()),
Interface: ctxt.Interface, DriveOPTimeout: globalDriveConfig.GetOPTimeout,
SendBufSize: ctxt.SendBufSize, Interface: ctxt.Interface,
RecvBufSize: ctxt.RecvBufSize, SendBufSize: ctxt.SendBufSize,
RecvBufSize: ctxt.RecvBufSize,
} }
// allow transport to be HTTP/1.1 for proxying. // allow transport to be HTTP/1.1 for proxying.
@ -816,6 +817,11 @@ func serverMain(ctx *cli.Context) {
} }
} }
var getCert certs.GetCertificateFunc
if globalTLSCerts != nil {
getCert = globalTLSCerts.GetCertificate
}
// Check for updates in non-blocking manner. // Check for updates in non-blocking manner.
go func() { go func() {
if !globalServerCtxt.Quiet && !globalInplaceUpdateDisabled { if !globalServerCtxt.Quiet && !globalInplaceUpdateDisabled {
@ -842,12 +848,7 @@ func serverMain(ctx *cli.Context) {
warnings = append(warnings, color.YellowBold("- Detected GOMAXPROCS(%d) < NumCPU(%d), please make sure to provide all PROCS to MinIO for optimal performance", maxProcs, cpuProcs)) warnings = append(warnings, color.YellowBold("- Detected GOMAXPROCS(%d) < NumCPU(%d), please make sure to provide all PROCS to MinIO for optimal performance", maxProcs, cpuProcs))
} }
var getCert certs.GetCertificateFunc // Initialize grid
if globalTLSCerts != nil {
getCert = globalTLSCerts.GetCertificate
}
// Initialize gridn
bootstrapTrace("initGrid", func() { bootstrapTrace("initGrid", func() {
logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services") logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services")
}) })
@ -909,9 +910,6 @@ func serverMain(ctx *cli.Context) {
} }
}) })
xhttp.SetDeploymentID(globalDeploymentID())
xhttp.SetMinIOVersion(Version)
for _, n := range globalNodes { for _, n := range globalNodes {
nodeName := n.Host nodeName := n.Host
if n.IsLocal { if n.IsLocal {

View File

@ -80,11 +80,7 @@ func getStorageViaEndpoint(endpoint Endpoint) StorageAPI {
globalLocalDrivesMu.RLock() globalLocalDrivesMu.RLock()
defer globalLocalDrivesMu.RUnlock() defer globalLocalDrivesMu.RUnlock()
if len(globalLocalSetDrives) == 0 { if len(globalLocalSetDrives) == 0 {
for _, drive := range globalLocalDrives { return globalLocalDrivesMap[endpoint.String()]
if drive != nil && drive.Endpoint().Equal(endpoint) {
return drive
}
}
} }
return globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] return globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx]
} }
@ -1387,6 +1383,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
defer globalLocalDrivesMu.Unlock() defer globalLocalDrivesMu.Unlock()
globalLocalDrives = append(globalLocalDrives, storage) globalLocalDrives = append(globalLocalDrives, storage)
globalLocalDrivesMap[endpoint.String()] = storage
globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] = storage globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] = storage
return true return true
} }

View File

@ -25,15 +25,18 @@ import (
"github.com/minio/pkg/v2/env" "github.com/minio/pkg/v2/env"
) )
// Drive specific timeout environment variables
const ( const (
envMaxDriveTimeout = "MINIO_DRIVE_MAX_TIMEOUT" EnvMaxDriveTimeout = "MINIO_DRIVE_MAX_TIMEOUT"
EnvMaxDriveTimeoutLegacy = "_MINIO_DRIVE_MAX_TIMEOUT"
EnvMaxDiskTimeoutLegacy = "_MINIO_DISK_MAX_TIMEOUT"
) )
// DefaultKVS - default KVS for drive // DefaultKVS - default KVS for drive
var DefaultKVS = config.KVS{ var DefaultKVS = config.KVS{
config.KV{ config.KV{
Key: MaxTimeout, Key: MaxTimeout,
Value: "", Value: "30s",
}, },
} }
@ -53,8 +56,13 @@ func (c *Config) Update(new Config) error {
return nil return nil
} }
// GetMaxTimeout - returns the max timeout value. // GetMaxTimeout - returns the per call drive operation timeout
func (c *Config) GetMaxTimeout() time.Duration { func (c *Config) GetMaxTimeout() time.Duration {
return c.GetOPTimeout()
}
// GetOPTimeout - returns the per call drive operation timeout
func (c *Config) GetOPTimeout() time.Duration {
configLk.RLock() configLk.RLock()
defer configLk.RUnlock() defer configLk.RUnlock()
@ -71,35 +79,32 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
} }
// if not set. Get default value from environment // if not set. Get default value from environment
d := env.Get(envMaxDriveTimeout, kvs.GetWithDefault(MaxTimeout, DefaultKVS)) d := env.Get(EnvMaxDriveTimeout, env.Get(EnvMaxDriveTimeoutLegacy, env.Get(EnvMaxDiskTimeoutLegacy, kvs.GetWithDefault(MaxTimeout, DefaultKVS))))
if d == "" { if d == "" {
d = env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "")
if d == "" {
d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "")
}
}
dur, _ := time.ParseDuration(d)
if dur < time.Second {
cfg.MaxTimeout = 30 * time.Second cfg.MaxTimeout = 30 * time.Second
} else { } else {
cfg.MaxTimeout = getMaxTimeout(dur) dur, _ := time.ParseDuration(d)
if dur < time.Second {
cfg.MaxTimeout = 30 * time.Second
} else {
cfg.MaxTimeout = getMaxTimeout(dur)
}
} }
return cfg, err return cfg, err
} }
func getMaxTimeout(t time.Duration) time.Duration { func getMaxTimeout(t time.Duration) time.Duration {
if t < time.Second { if t > time.Second {
// get default value return t
d := env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "")
if d == "" {
d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "")
}
dur, _ := time.ParseDuration(d)
if dur < time.Second {
return 30 * time.Second
}
return dur
} }
return t // get default value
d := env.Get(EnvMaxDriveTimeoutLegacy, env.Get(EnvMaxDiskTimeoutLegacy, ""))
if d == "" {
return 30 * time.Second
}
dur, _ := time.ParseDuration(d)
if dur < time.Second {
return 30 * time.Second
}
return dur
} }

View File

@ -22,12 +22,13 @@ import "github.com/minio/minio/internal/config"
var ( var (
// MaxTimeout is the max timeout for drive // MaxTimeout is the max timeout for drive
MaxTimeout = "max_timeout" MaxTimeout = "max_timeout"
// HelpDrive is help for drive // HelpDrive is help for drive
HelpDrive = config.HelpKVS{ HelpDrive = config.HelpKVS{
config.HelpKV{ config.HelpKV{
Key: MaxTimeout, Key: MaxTimeout,
Type: "string", Type: "string",
Description: "set per call max_timeout for the drive, defaults to 2 minutes", Description: "set per call max_timeout for the drive, defaults to 30 seconds",
Optional: true, Optional: true,
}, },
} }

View File

@ -33,13 +33,13 @@ type DeadlineConn struct {
// Sets read deadline // Sets read deadline
func (c *DeadlineConn) setReadDeadline() { func (c *DeadlineConn) setReadDeadline() {
if c.readDeadline > 0 { if c.readDeadline > 0 {
c.SetReadDeadline(time.Now().UTC().Add(c.readDeadline)) c.Conn.SetReadDeadline(time.Now().UTC().Add(c.readDeadline))
} }
} }
func (c *DeadlineConn) setWriteDeadline() { func (c *DeadlineConn) setWriteDeadline() {
if c.writeDeadline > 0 { if c.writeDeadline > 0 {
c.SetWriteDeadline(time.Now().UTC().Add(c.writeDeadline)) c.Conn.SetWriteDeadline(time.Now().UTC().Add(c.writeDeadline))
} }
} }

View File

@ -42,6 +42,7 @@ import (
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/pubsub" "github.com/minio/minio/internal/pubsub"
xnet "github.com/minio/pkg/v2/net"
"github.com/puzpuzpuz/xsync/v3" "github.com/puzpuzpuz/xsync/v3"
"github.com/tinylib/msgp/msgp" "github.com/tinylib/msgp/msgp"
"github.com/zeebo/xxh3" "github.com/zeebo/xxh3"
@ -677,9 +678,8 @@ func (c *Connection) connect() {
return return
} }
if gotState != StateConnecting { if gotState != StateConnecting {
// Don't print error on first attempt, // Don't print error on first attempt, and after that only once per hour.
// and after that only once per hour. gridLogOnceIf(c.ctx, fmt.Errorf("grid: %s re-connecting to %s: %w (%T) Sleeping %v (%v)", c.Local, toDial, err, err, sleep, gotState), toDial)
gridLogOnceIf(c.ctx, fmt.Errorf("grid: %s connecting to %s: %w (%T) Sleeping %v (%v)", c.Local, toDial, err, err, sleep, gotState), toDial)
} }
c.updateState(StateConnectionError) c.updateState(StateConnectionError)
time.Sleep(sleep) time.Sleep(sleep)
@ -972,7 +972,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
msg, err = readDataInto(msg, conn, c.side, ws.OpBinary) msg, err = readDataInto(msg, conn, c.side, ws.OpBinary)
if err != nil { if err != nil {
cancel(ErrDisconnected) cancel(ErrDisconnected)
gridLogIfNot(ctx, fmt.Errorf("ws read: %w", err), net.ErrClosed, io.EOF) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIfNot(ctx, fmt.Errorf("ws read: %w", err), net.ErrClosed, io.EOF)
}
return return
} }
if c.incomingBytes != nil { if c.incomingBytes != nil {
@ -983,7 +985,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
var m message var m message
subID, remain, err := m.parse(msg) subID, remain, err := m.parse(msg)
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("ws parse package: %w", err)) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws parse package: %w", err))
}
cancel(ErrDisconnected) cancel(ErrDisconnected)
return return
} }
@ -1004,7 +1008,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
var next []byte var next []byte
next, remain, err = msgp.ReadBytesZC(remain) next, remain, err = msgp.ReadBytesZC(remain)
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("ws read merged: %w", err)) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws read merged: %w", err))
}
cancel(ErrDisconnected) cancel(ErrDisconnected)
return return
} }
@ -1012,7 +1018,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
m.Payload = nil m.Payload = nil
subID, _, err = m.parse(next) subID, _, err = m.parse(next)
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("ws parse merged: %w", err)) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws parse merged: %w", err))
}
cancel(ErrDisconnected) cancel(ErrDisconnected)
return return
} }
@ -1119,18 +1127,24 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
buf.Reset() buf.Reset()
err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err))
}
return return
} }
PutByteBuffer(toSend) PutByteBuffer(toSend)
err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout))
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err)) gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err))
return return
} }
_, err = buf.WriteTo(conn) _, err = buf.WriteTo(conn)
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws write: %w", err))
}
return return
} }
continue continue
@ -1163,18 +1177,24 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
buf.Reset() buf.Reset()
err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err))
}
return return
} }
// buf is our local buffer, so we can reuse it.
err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout))
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err)) gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err))
return return
} }
// buf is our local buffer, so we can reuse it.
_, err = buf.WriteTo(conn) _, err = buf.WriteTo(conn)
if err != nil { if err != nil {
gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws write: %w", err))
}
return return
} }

View File

@ -26,6 +26,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/minio/minio/internal/deadlineconn"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@ -39,8 +40,8 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
// Enable custom socket send/recv buffers.
if opts.SendBufSize > 0 { if opts.SendBufSize > 0 {
// Enable big buffers
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, opts.SendBufSize) _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, opts.SendBufSize)
} }
@ -84,7 +85,9 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall
// https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ // https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/
// This is a sensitive configuration, it is better to set it to high values, > 60 secs since it can // This is a sensitive configuration, it is better to set it to high values, > 60 secs since it can
// affect clients reading data with a very slow pace (disappropriate with socket buffer sizes) // affect clients reading data with a very slow pace (disappropriate with socket buffer sizes)
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, opts.UserTimeout) if opts.UserTimeout > 0 {
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, opts.UserTimeout)
}
if opts.Interface != "" { if opts.Interface != "" {
if h, _, err := net.SplitHostPort(address); err == nil { if h, _, err := net.SplitHostPort(address); err == nil {
@ -111,6 +114,16 @@ func NewInternodeDialContext(dialTimeout time.Duration, opts TCPOptions) DialCon
Timeout: dialTimeout, Timeout: dialTimeout,
Control: setTCPParametersFn(opts), Control: setTCPParametersFn(opts),
} }
return dialer.DialContext(ctx, network, addr) conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
if opts.DriveOPTimeout != nil {
// Read deadlines are sufficient for now as per various
// scenarios of hung node detection, we may add Write deadlines
// if needed later on.
return deadlineconn.New(conn).WithReadDeadline(opts.DriveOPTimeout()), nil
}
return conn, nil
} }
} }

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"net" "net"
"syscall" "syscall"
"time"
) )
type acceptResult struct { type acceptResult struct {
@ -117,13 +118,27 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) {
// TCPOptions specify customizable TCP optimizations on raw socket // TCPOptions specify customizable TCP optimizations on raw socket
type TCPOptions struct { type TCPOptions struct {
UserTimeout int // this value is expected to be in milliseconds UserTimeout int // this value is expected to be in milliseconds
// When the net.Conn is a remote drive this value is honored, we close the connection to remote peer proactively.
DriveOPTimeout func() time.Duration
SendBufSize int // SO_SNDBUF size for the socket connection, NOTE: this sets server and client connection SendBufSize int // SO_SNDBUF size for the socket connection, NOTE: this sets server and client connection
RecvBufSize int // SO_RECVBUF size for the socket connection, NOTE: this sets server and client connection RecvBufSize int // SO_RECVBUF size for the socket connection, NOTE: this sets server and client connection
Interface string // This is a VRF device passed via `--interface` flag Interface string // This is a VRF device passed via `--interface` flag
Trace func(msg string) // Trace when starting. Trace func(msg string) // Trace when starting.
} }
// ForWebsocket returns TCPOptions valid for websocket net.Conn
func (t TCPOptions) ForWebsocket() TCPOptions {
return TCPOptions{
UserTimeout: t.UserTimeout,
Interface: t.Interface,
SendBufSize: t.SendBufSize,
RecvBufSize: t.RecvBufSize,
}
}
// newHTTPListener - creates new httpListener object which is interface compatible to net.Listener. // newHTTPListener - creates new httpListener object which is interface compatible to net.Listener.
// httpListener is capable to // httpListener is capable to
// * listen to multiple addresses // * listen to multiple addresses

View File

@ -97,13 +97,6 @@ type ioret[V any] struct {
err error err error
} }
// DeadlineWriter deadline writer with timeout
type DeadlineWriter struct {
io.WriteCloser
timeout time.Duration
err error
}
// WithDeadline will execute a function with a deadline and return a value of a given type. // WithDeadline will execute a function with a deadline and return a value of a given type.
// If the deadline/context passes before the function finishes executing, // If the deadline/context passes before the function finishes executing,
// the zero value and the context error is returned. // the zero value and the context error is returned.
@ -145,21 +138,17 @@ func NewDeadlineWorker(timeout time.Duration) *DeadlineWorker {
// channel so that the work function can attempt to exit gracefully. // channel so that the work function can attempt to exit gracefully.
// Multiple calls to Run will run independently of each other. // Multiple calls to Run will run independently of each other.
func (d *DeadlineWorker) Run(work func() error) error { func (d *DeadlineWorker) Run(work func() error) error {
c := make(chan ioret[struct{}], 1) _, err := WithDeadline[struct{}](context.Background(), d.timeout, func(ctx context.Context) (struct{}, error) {
t := time.NewTimer(d.timeout) return struct{}{}, work()
go func() { })
c <- ioret[struct{}]{val: struct{}{}, err: work()} return err
}() }
select { // DeadlineWriter deadline writer with timeout
case r := <-c: type DeadlineWriter struct {
if !t.Stop() { io.WriteCloser
<-t.C timeout time.Duration
} err error
return r.err
case <-t.C:
return context.DeadlineExceeded
}
} }
// NewDeadlineWriter wraps a writer to make it respect given deadline // NewDeadlineWriter wraps a writer to make it respect given deadline

View File

@ -41,6 +41,26 @@ func (w *sleepWriter) Close() error {
return nil return nil
} }
func TestDeadlineWorker(t *testing.T) {
work := NewDeadlineWorker(500 * time.Millisecond)
err := work.Run(func() error {
time.Sleep(600 * time.Millisecond)
return nil
})
if err != context.DeadlineExceeded {
t.Error("DeadlineWorker shouldn't be successful - should return context.DeadlineExceeded")
}
err = work.Run(func() error {
time.Sleep(450 * time.Millisecond)
return nil
})
if err != nil {
t.Error("DeadlineWorker should succeed")
}
}
func TestDeadlineWriter(t *testing.T) { func TestDeadlineWriter(t *testing.T) {
w := NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 450*time.Millisecond) w := NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 450*time.Millisecond)
_, err := w.Write([]byte("1")) _, err := w.Write([]byte("1"))