diff --git a/.github/workflows/go-cross.yml b/.github/workflows/go-cross.yml index b7a58584d..d1c40060a 100644 --- a/.github/workflows/go-cross.yml +++ b/.github/workflows/go-cross.yml @@ -4,7 +4,6 @@ on: pull_request: branches: - master - - next # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/go-fips.yml b/.github/workflows/go-fips.yml index 17e9cb8e1..7af00216f 100644 --- a/.github/workflows/go-fips.yml +++ b/.github/workflows/go-fips.yml @@ -3,8 +3,7 @@ name: FIPS Build Test on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/go-healing.yml b/.github/workflows/go-healing.yml index f453c82fc..61c403bc9 100644 --- a/.github/workflows/go-healing.yml +++ b/.github/workflows/go-healing.yml @@ -3,8 +3,7 @@ name: Healing Functional Tests on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/go-lint.yml b/.github/workflows/go-lint.yml index 5396c5d07..a14d42f32 100644 --- a/.github/workflows/go-lint.yml +++ b/.github/workflows/go-lint.yml @@ -3,12 +3,11 @@ name: Linters and Tests on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. -concurrency: +concurrency: group: ${{ github.workflow }}-${{ github.head_ref }} cancel-in-progress: true @@ -35,9 +34,10 @@ jobs: CGO_ENABLED: 0 GO111MODULE: on run: | + Set-MpPreference -DisableRealtimeMonitoring $true netsh int ipv4 set dynamicport tcp start=60000 num=61000 go build --ldflags="-s -w" -o %GOPATH%\bin\minio.exe - go test -v --timeout 50m ./... + go test -v --timeout 120m ./... - name: Build on ${{ matrix.os }} if: matrix.os == 'ubuntu-latest' env: diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 0de28788d..2c3f08e75 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -3,8 +3,7 @@ name: Functional Tests on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/helm-lint.yml b/.github/workflows/helm-lint.yml index 1933100fe..a30a1fb62 100644 --- a/.github/workflows/helm-lint.yml +++ b/.github/workflows/helm-lint.yml @@ -3,8 +3,7 @@ name: Helm Chart linting on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/iam-integrations.yaml b/.github/workflows/iam-integrations.yaml index 7c49f9167..d659a0cf1 100644 --- a/.github/workflows/iam-integrations.yaml +++ b/.github/workflows/iam-integrations.yaml @@ -4,7 +4,6 @@ on: pull_request: branches: - master - - next # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/mint.yml b/.github/workflows/mint.yml index 33036f7ca..014673391 100644 --- a/.github/workflows/mint.yml +++ b/.github/workflows/mint.yml @@ -4,7 +4,6 @@ on: pull_request: branches: - master - - next # This ensures that previous jobs for the PR are canceled when the PR is # updated. @@ -56,6 +55,10 @@ jobs: run: | ${GITHUB_WORKSPACE}/.github/workflows/run-mint.sh "erasure" "minio" "minio123" "${{ steps.vars.outputs.sha_short }}" + - name: resiliency + run: | + ${GITHUB_WORKSPACE}/.github/workflows/run-mint.sh "resiliency" "minio" "minio123" "${{ steps.vars.outputs.sha_short }}" + - name: The job must cleanup if: ${{ always() }} run: | diff --git a/.github/workflows/mint/minio-resiliency.yaml b/.github/workflows/mint/minio-resiliency.yaml new file mode 100644 index 000000000..9d569c59e --- /dev/null +++ b/.github/workflows/mint/minio-resiliency.yaml @@ -0,0 +1,78 @@ +version: '3.7' + +# Settings and configurations that are common for all containers +x-minio-common: &minio-common + image: quay.io/minio/minio:${JOB_NAME} + command: server --console-address ":9001" http://minio{1...4}/rdata{1...2} + expose: + - "9000" + - "9001" + environment: + MINIO_CI_CD: "on" + MINIO_ROOT_USER: "minio" + MINIO_ROOT_PASSWORD: "minio123" + MINIO_KMS_SECRET_KEY: "my-minio-key:OSMM+vkKUTCvQs9YL/CVMIMt43HFhkUpqJxTmGl6rYw=" + MINIO_DRIVE_MAX_TIMEOUT: "5s" + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 5s + timeout: 5s + retries: 5 + +# starts 4 docker containers running minio server instances. +# using nginx reverse proxy, load balancing, you can access +# it through port 9000. +services: + minio1: + <<: *minio-common + hostname: minio1 + volumes: + - rdata1-1:/rdata1 + - rdata1-2:/rdata2 + + minio2: + <<: *minio-common + hostname: minio2 + volumes: + - rdata2-1:/rdata1 + - rdata2-2:/rdata2 + + minio3: + <<: *minio-common + hostname: minio3 + volumes: + - rdata3-1:/rdata1 + - rdata3-2:/rdata2 + + minio4: + <<: *minio-common + hostname: minio4 + volumes: + - rdata4-1:/rdata1 + - rdata4-2:/rdata2 + + nginx: + image: nginx:1.19.2-alpine + hostname: nginx + volumes: + - ./nginx-4-node.conf:/etc/nginx/nginx.conf:ro + ports: + - "9000:9000" + - "9001:9001" + depends_on: + - minio1 + - minio2 + - minio3 + - minio4 + +## By default this config uses default local driver, +## For custom volumes replace with volume driver configuration. +volumes: + rdata1-1: + rdata1-2: + rdata2-1: + rdata2-2: + rdata3-1: + rdata3-2: + rdata4-1: + rdata4-2: diff --git a/.github/workflows/replication.yaml b/.github/workflows/replication.yaml index 4ad56e7cc..b4917348f 100644 --- a/.github/workflows/replication.yaml +++ b/.github/workflows/replication.yaml @@ -3,8 +3,7 @@ name: MinIO advanced tests on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/root-disable.yml b/.github/workflows/root-disable.yml index 57adb573d..4601469ae 100644 --- a/.github/workflows/root-disable.yml +++ b/.github/workflows/root-disable.yml @@ -3,8 +3,7 @@ name: Root lockdown tests on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/.github/workflows/run-mint.sh b/.github/workflows/run-mint.sh index 6aa2cd502..eafd69d03 100755 --- a/.github/workflows/run-mint.sh +++ b/.github/workflows/run-mint.sh @@ -16,7 +16,7 @@ docker volume rm $(docker volume ls -f dangling=true) || true cd .github/workflows/mint docker-compose -f minio-${MODE}.yaml up -d -sleep 30s +sleep 1m docker system prune -f || true docker volume prune -f || true @@ -26,6 +26,9 @@ docker volume rm $(docker volume ls -q -f dangling=true) || true [ "${MODE}" == "pools" ] && docker-compose -f minio-${MODE}.yaml stop minio2 [ "${MODE}" == "pools" ] && docker-compose -f minio-${MODE}.yaml stop minio6 +# Pause one node, to check that all S3 calls work while one node goes wrong +[ "${MODE}" == "resiliency" ] && docker-compose -f minio-${MODE}.yaml pause minio4 + docker run --rm --net=mint_default \ --name="mint-${MODE}-${JOB_NAME}" \ -e SERVER_ENDPOINT="nginx:9000" \ @@ -35,6 +38,18 @@ docker run --rm --net=mint_default \ -e MINT_MODE="${MINT_MODE}" \ docker.io/minio/mint:edge +# FIXME: enable this after fixing aws-sdk-java-v2 tests +# # unpause the node, to check that all S3 calls work while one node goes wrong +# [ "${MODE}" == "resiliency" ] && docker-compose -f minio-${MODE}.yaml unpause minio4 +# [ "${MODE}" == "resiliency" ] && docker run --rm --net=mint_default \ +# --name="mint-${MODE}-${JOB_NAME}" \ +# -e SERVER_ENDPOINT="nginx:9000" \ +# -e ACCESS_KEY="${ACCESS_KEY}" \ +# -e SECRET_KEY="${SECRET_KEY}" \ +# -e ENABLE_HTTPS=0 \ +# -e MINT_MODE="${MINT_MODE}" \ +# docker.io/minio/mint:edge + docker-compose -f minio-${MODE}.yaml down || true sleep 10s diff --git a/.github/workflows/shfmt.yml b/.github/workflows/shfmt.yml index 70c9d28e1..3e446306b 100644 --- a/.github/workflows/shfmt.yml +++ b/.github/workflows/shfmt.yml @@ -3,8 +3,7 @@ name: Shell formatting checks on: pull_request: branches: - - master - - next + - master permissions: contents: read diff --git a/.github/workflows/upgrade-ci-cd.yaml b/.github/workflows/upgrade-ci-cd.yaml index 5769826af..c2bc73c21 100644 --- a/.github/workflows/upgrade-ci-cd.yaml +++ b/.github/workflows/upgrade-ci-cd.yaml @@ -3,8 +3,7 @@ name: Upgrade old version tests on: pull_request: branches: - - master - - next + - master # This ensures that previous jobs for the PR are canceled when the PR is # updated. diff --git a/buildscripts/disable-root.sh b/buildscripts/disable-root.sh index bbd13836f..c35c769f0 100755 --- a/buildscripts/disable-root.sh +++ b/buildscripts/disable-root.sh @@ -57,7 +57,7 @@ done set +e -sleep 10 +./mc ready minioadm/ ./mc ls minioadm/ if [ $? -ne 0 ]; then diff --git a/cmd/admin-server-info.go b/cmd/admin-server-info.go index 33655bfa0..dbb28f47d 100644 --- a/cmd/admin-server-info.go +++ b/cmd/admin-server-info.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2024 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -18,7 +18,6 @@ package cmd import ( - "context" "math" "net/http" "os" @@ -31,6 +30,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/kms" + xnet "github.com/minio/pkg/v2/net" ) // getLocalServerProperty - returns madmin.ServerProperties for only the @@ -64,9 +64,11 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req if err := isServerResolvable(endpoint, 5*time.Second); err == nil { network[nodeName] = string(madmin.ItemOnline) } else { - network[nodeName] = string(madmin.ItemOffline) - // log once the error - peersLogOnceIf(context.Background(), err, nodeName) + if xnet.IsNetworkOrHostDown(err, false) { + network[nodeName] = string(madmin.ItemOffline) + } else if xnet.IsNetworkOrHostDown(err, true) { + network[nodeName] = "connection attempt timedout" + } } } } diff --git a/cmd/common-main.go b/cmd/common-main.go index 8404156af..a1437c0d3 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -404,16 +404,13 @@ func buildServerCtxt(ctx *cli.Context, ctxt *serverCtxt) (err error) { ctxt.FTP = ctx.StringSlice("ftp") ctxt.SFTP = ctx.StringSlice("sftp") - ctxt.Interface = ctx.String("interface") ctxt.UserTimeout = ctx.Duration("conn-user-timeout") ctxt.SendBufSize = ctx.Int("send-buf-size") ctxt.RecvBufSize = ctx.Int("recv-buf-size") - - ctxt.ShutdownTimeout = ctx.Duration("shutdown-timeout") ctxt.IdleTimeout = ctx.Duration("idle-timeout") - ctxt.ReadHeaderTimeout = ctx.Duration("read-header-timeout") - ctxt.MaxIdleConnsPerHost = ctx.Int("max-idle-conns-per-host") + ctxt.UserTimeout = ctx.Duration("conn-user-timeout") + ctxt.ShutdownTimeout = ctx.Duration("shutdown-timeout") if conf := ctx.String("config"); len(conf) > 0 { err = mergeServerCtxtFromConfigFile(conf, ctxt) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 594893b83..5829f1a92 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -258,6 +258,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object healTrace(healingMetricObject, startTime, bucket, object, &opts, err, &result) }() } + // Initialize heal result object result = madmin.HealResultItem{ Type: madmin.HealItemObject, diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 12e4bc48e..e81a3decc 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -657,7 +657,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo if err != nil { return PartInfo{}, err } - pctx := plkctx.Context() + + ctx = plkctx.Context() defer partIDLock.Unlock(plkctx) onlineDisks := er.getDisks() @@ -689,7 +690,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } }() - erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) + erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -742,7 +743,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } } - n, err := erasure.Encode(pctx, toEncode, writers, buffer, writeQuorum) + n, err := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum) closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 91384f51e..cc21cee5c 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -2472,16 +2472,10 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea for _, disk := range storageInfo.Disks { if opts.Maintenance { - var skip bool globalLocalDrivesMu.RLock() - for _, drive := range globalLocalDrives { - if drive != nil && drive.Endpoint().String() == disk.Endpoint { - skip = true - break - } - } + _, ok := globalLocalDrivesMap[disk.Endpoint] globalLocalDrivesMu.RUnlock() - if skip { + if ok { continue } } diff --git a/cmd/globals.go b/cmd/globals.go index 5a498ca25..9ac5890c9 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -414,8 +414,9 @@ var ( // List of local drives to this node, this is only set during server startup, // and is only mutated by HealFormat. Hold globalLocalDrivesMu to access. - globalLocalDrives []StorageAPI - globalLocalDrivesMu sync.RWMutex + globalLocalDrives []StorageAPI + globalLocalDrivesMap = make(map[string]StorageAPI) + globalLocalDrivesMu sync.RWMutex globalDriveMonitoring = env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn diff --git a/cmd/grid.go b/cmd/grid.go index 5e20797de..81c9d07fe 100644 --- a/cmd/grid.go +++ b/cmd/grid.go @@ -36,8 +36,12 @@ var globalGridStart = make(chan struct{}) func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error { hosts, local := eps.GridHosts() + lookupHost := globalDNSCache.LookupHost g, err := grid.NewManager(ctx, grid.ManagerOptions{ - Dialer: grid.ContextDialer(xhttp.DialContextWithLookupHost(globalDNSCache.LookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions))), + // Pass Dialer for websocket grid, make sure we do not + // provide any DriveOPTimeout() function, as that is not + // useful over persistent connections. + Dialer: grid.ContextDialer(xhttp.DialContextWithLookupHost(lookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions.ForWebsocket()))), Local: local, Hosts: hosts, AddAuth: newCachedAuthToken(), diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index aadbf2dce..d6cf8a06b 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -320,6 +320,9 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc } func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) { + ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + bi, err := listBucketsRPC.Call(ctx, client.gridConn(), &opts) if err != nil { return nil, toStorageErr(err) @@ -345,6 +348,9 @@ func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string, peerS3BucketDeleted: strconv.FormatBool(opts.Remove), }) + ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + _, err := healBucketRPC.Call(ctx, conn, mss) // Initialize heal result info @@ -367,6 +373,9 @@ func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket stri peerS3BucketDeleted: strconv.FormatBool(opts.Deleted), }) + ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + volInfo, err := headBucketRPC.Call(ctx, conn, mss) if err != nil { return BucketInfo{}, toStorageErr(err) @@ -418,6 +427,9 @@ func (client *remotePeerS3Client) MakeBucket(ctx context.Context, bucket string, peerS3BucketForceCreate: strconv.FormatBool(opts.ForceCreate), }) + ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + _, err := makeBucketRPC.Call(ctx, conn, mss) return toStorageErr(err) } @@ -467,6 +479,9 @@ func (client *remotePeerS3Client) DeleteBucket(ctx context.Context, bucket strin peerS3BucketForceDelete: strconv.FormatBool(opts.Force), }) + ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout()) + defer cancel() + _, err := deleteBucketRPC.Call(ctx, conn, mss) return toStorageErr(err) } diff --git a/cmd/post-policy_test.go b/cmd/post-policy_test.go index 186249e1c..e2f74bc72 100644 --- a/cmd/post-policy_test.go +++ b/cmd/post-policy_test.go @@ -234,7 +234,7 @@ func testPostPolicyBucketHandler(obj ObjectLayer, instanceType string, t TestErr // Call the ServeHTTP to execute the handler. apiRouter.ServeHTTP(rec, req) if rec.Code != test.expectedStatus { - t.Fatalf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, test.expectedStatus, rec.Code) + t.Fatalf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`, Resp: %s", i+1, instanceType, test.expectedStatus, rec.Code, rec.Body) } } diff --git a/cmd/server-main.go b/cmd/server-main.go index 5cf72b958..3a25428b0 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -414,10 +414,11 @@ func serverHandleCmdArgs(ctxt serverCtxt) { setGlobalInternodeInterface(ctxt.Interface) globalTCPOptions = xhttp.TCPOptions{ - UserTimeout: int(ctxt.UserTimeout.Milliseconds()), - Interface: ctxt.Interface, - SendBufSize: ctxt.SendBufSize, - RecvBufSize: ctxt.RecvBufSize, + UserTimeout: int(ctxt.UserTimeout.Milliseconds()), + DriveOPTimeout: globalDriveConfig.GetOPTimeout, + Interface: ctxt.Interface, + SendBufSize: ctxt.SendBufSize, + RecvBufSize: ctxt.RecvBufSize, } // allow transport to be HTTP/1.1 for proxying. @@ -816,6 +817,11 @@ func serverMain(ctx *cli.Context) { } } + var getCert certs.GetCertificateFunc + if globalTLSCerts != nil { + getCert = globalTLSCerts.GetCertificate + } + // Check for updates in non-blocking manner. go func() { if !globalServerCtxt.Quiet && !globalInplaceUpdateDisabled { @@ -842,12 +848,7 @@ func serverMain(ctx *cli.Context) { warnings = append(warnings, color.YellowBold("- Detected GOMAXPROCS(%d) < NumCPU(%d), please make sure to provide all PROCS to MinIO for optimal performance", maxProcs, cpuProcs)) } - var getCert certs.GetCertificateFunc - if globalTLSCerts != nil { - getCert = globalTLSCerts.GetCertificate - } - - // Initialize gridn + // Initialize grid bootstrapTrace("initGrid", func() { logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services") }) @@ -909,9 +910,6 @@ func serverMain(ctx *cli.Context) { } }) - xhttp.SetDeploymentID(globalDeploymentID()) - xhttp.SetMinIOVersion(Version) - for _, n := range globalNodes { nodeName := n.Host if n.IsLocal { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 18525aa98..49f7b52de 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -80,11 +80,7 @@ func getStorageViaEndpoint(endpoint Endpoint) StorageAPI { globalLocalDrivesMu.RLock() defer globalLocalDrivesMu.RUnlock() if len(globalLocalSetDrives) == 0 { - for _, drive := range globalLocalDrives { - if drive != nil && drive.Endpoint().Equal(endpoint) { - return drive - } - } + return globalLocalDrivesMap[endpoint.String()] } return globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] } @@ -1387,6 +1383,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin defer globalLocalDrivesMu.Unlock() globalLocalDrives = append(globalLocalDrives, storage) + globalLocalDrivesMap[endpoint.String()] = storage globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] = storage return true } diff --git a/internal/config/drive/drive.go b/internal/config/drive/drive.go index ab4845090..6ac7b0de9 100644 --- a/internal/config/drive/drive.go +++ b/internal/config/drive/drive.go @@ -25,15 +25,18 @@ import ( "github.com/minio/pkg/v2/env" ) +// Drive specific timeout environment variables const ( - envMaxDriveTimeout = "MINIO_DRIVE_MAX_TIMEOUT" + EnvMaxDriveTimeout = "MINIO_DRIVE_MAX_TIMEOUT" + EnvMaxDriveTimeoutLegacy = "_MINIO_DRIVE_MAX_TIMEOUT" + EnvMaxDiskTimeoutLegacy = "_MINIO_DISK_MAX_TIMEOUT" ) // DefaultKVS - default KVS for drive var DefaultKVS = config.KVS{ config.KV{ Key: MaxTimeout, - Value: "", + Value: "30s", }, } @@ -53,8 +56,13 @@ func (c *Config) Update(new Config) error { return nil } -// GetMaxTimeout - returns the max timeout value. +// GetMaxTimeout - returns the per call drive operation timeout func (c *Config) GetMaxTimeout() time.Duration { + return c.GetOPTimeout() +} + +// GetOPTimeout - returns the per call drive operation timeout +func (c *Config) GetOPTimeout() time.Duration { configLk.RLock() defer configLk.RUnlock() @@ -71,35 +79,32 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { } // if not set. Get default value from environment - d := env.Get(envMaxDriveTimeout, kvs.GetWithDefault(MaxTimeout, DefaultKVS)) + d := env.Get(EnvMaxDriveTimeout, env.Get(EnvMaxDriveTimeoutLegacy, env.Get(EnvMaxDiskTimeoutLegacy, kvs.GetWithDefault(MaxTimeout, DefaultKVS)))) if d == "" { - d = env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "") - if d == "" { - d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "") - } - } - - dur, _ := time.ParseDuration(d) - if dur < time.Second { cfg.MaxTimeout = 30 * time.Second } else { - cfg.MaxTimeout = getMaxTimeout(dur) + dur, _ := time.ParseDuration(d) + if dur < time.Second { + cfg.MaxTimeout = 30 * time.Second + } else { + cfg.MaxTimeout = getMaxTimeout(dur) + } } return cfg, err } func getMaxTimeout(t time.Duration) time.Duration { - if t < time.Second { - // get default value - d := env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "") - if d == "" { - d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "") - } - dur, _ := time.ParseDuration(d) - if dur < time.Second { - return 30 * time.Second - } - return dur + if t > time.Second { + return t } - return t + // get default value + d := env.Get(EnvMaxDriveTimeoutLegacy, env.Get(EnvMaxDiskTimeoutLegacy, "")) + if d == "" { + return 30 * time.Second + } + dur, _ := time.ParseDuration(d) + if dur < time.Second { + return 30 * time.Second + } + return dur } diff --git a/internal/config/drive/help.go b/internal/config/drive/help.go index 3ed68cbab..5964dcce4 100644 --- a/internal/config/drive/help.go +++ b/internal/config/drive/help.go @@ -22,12 +22,13 @@ import "github.com/minio/minio/internal/config" var ( // MaxTimeout is the max timeout for drive MaxTimeout = "max_timeout" + // HelpDrive is help for drive HelpDrive = config.HelpKVS{ config.HelpKV{ Key: MaxTimeout, Type: "string", - Description: "set per call max_timeout for the drive, defaults to 2 minutes", + Description: "set per call max_timeout for the drive, defaults to 30 seconds", Optional: true, }, } diff --git a/internal/deadlineconn/deadlineconn.go b/internal/deadlineconn/deadlineconn.go index 25b479ea3..7cb7b766e 100644 --- a/internal/deadlineconn/deadlineconn.go +++ b/internal/deadlineconn/deadlineconn.go @@ -33,13 +33,13 @@ type DeadlineConn struct { // Sets read deadline func (c *DeadlineConn) setReadDeadline() { if c.readDeadline > 0 { - c.SetReadDeadline(time.Now().UTC().Add(c.readDeadline)) + c.Conn.SetReadDeadline(time.Now().UTC().Add(c.readDeadline)) } } func (c *DeadlineConn) setWriteDeadline() { if c.writeDeadline > 0 { - c.SetWriteDeadline(time.Now().UTC().Add(c.writeDeadline)) + c.Conn.SetWriteDeadline(time.Now().UTC().Add(c.writeDeadline)) } } diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 72925c5d2..5cfac573c 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -42,6 +42,7 @@ import ( xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/pubsub" + xnet "github.com/minio/pkg/v2/net" "github.com/puzpuzpuz/xsync/v3" "github.com/tinylib/msgp/msgp" "github.com/zeebo/xxh3" @@ -677,9 +678,8 @@ func (c *Connection) connect() { return } if gotState != StateConnecting { - // Don't print error on first attempt, - // and after that only once per hour. - gridLogOnceIf(c.ctx, fmt.Errorf("grid: %s connecting to %s: %w (%T) Sleeping %v (%v)", c.Local, toDial, err, err, sleep, gotState), toDial) + // Don't print error on first attempt, and after that only once per hour. + gridLogOnceIf(c.ctx, fmt.Errorf("grid: %s re-connecting to %s: %w (%T) Sleeping %v (%v)", c.Local, toDial, err, err, sleep, gotState), toDial) } c.updateState(StateConnectionError) time.Sleep(sleep) @@ -972,7 +972,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { msg, err = readDataInto(msg, conn, c.side, ws.OpBinary) if err != nil { cancel(ErrDisconnected) - gridLogIfNot(ctx, fmt.Errorf("ws read: %w", err), net.ErrClosed, io.EOF) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIfNot(ctx, fmt.Errorf("ws read: %w", err), net.ErrClosed, io.EOF) + } return } if c.incomingBytes != nil { @@ -983,7 +985,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { var m message subID, remain, err := m.parse(msg) if err != nil { - gridLogIf(ctx, fmt.Errorf("ws parse package: %w", err)) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws parse package: %w", err)) + } cancel(ErrDisconnected) return } @@ -1004,7 +1008,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { var next []byte next, remain, err = msgp.ReadBytesZC(remain) if err != nil { - gridLogIf(ctx, fmt.Errorf("ws read merged: %w", err)) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws read merged: %w", err)) + } cancel(ErrDisconnected) return } @@ -1012,7 +1018,9 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { m.Payload = nil subID, _, err = m.parse(next) if err != nil { - gridLogIf(ctx, fmt.Errorf("ws parse merged: %w", err)) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws parse merged: %w", err)) + } cancel(ErrDisconnected) return } @@ -1119,18 +1127,24 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { buf.Reset() err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) if err != nil { - gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) + } return } PutByteBuffer(toSend) + err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) if err != nil { gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err)) return } + _, err = buf.WriteTo(conn) if err != nil { - gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) + } return } continue @@ -1163,18 +1177,24 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { buf.Reset() err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) if err != nil { - gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) + } return } - // buf is our local buffer, so we can reuse it. + err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) if err != nil { gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err)) return } + + // buf is our local buffer, so we can reuse it. _, err = buf.WriteTo(conn) if err != nil { - gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) + } return } diff --git a/internal/http/dial_linux.go b/internal/http/dial_linux.go index f210d4bde..f271e36c2 100644 --- a/internal/http/dial_linux.go +++ b/internal/http/dial_linux.go @@ -26,6 +26,7 @@ import ( "syscall" "time" + "github.com/minio/minio/internal/deadlineconn" "golang.org/x/sys/unix" ) @@ -39,8 +40,8 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + // Enable custom socket send/recv buffers. if opts.SendBufSize > 0 { - // Enable big buffers _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, opts.SendBufSize) } @@ -84,7 +85,9 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall // https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ // This is a sensitive configuration, it is better to set it to high values, > 60 secs since it can // affect clients reading data with a very slow pace (disappropriate with socket buffer sizes) - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, opts.UserTimeout) + if opts.UserTimeout > 0 { + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, opts.UserTimeout) + } if opts.Interface != "" { if h, _, err := net.SplitHostPort(address); err == nil { @@ -111,6 +114,16 @@ func NewInternodeDialContext(dialTimeout time.Duration, opts TCPOptions) DialCon Timeout: dialTimeout, Control: setTCPParametersFn(opts), } - return dialer.DialContext(ctx, network, addr) + conn, err := dialer.DialContext(ctx, network, addr) + if err != nil { + return nil, err + } + if opts.DriveOPTimeout != nil { + // Read deadlines are sufficient for now as per various + // scenarios of hung node detection, we may add Write deadlines + // if needed later on. + return deadlineconn.New(conn).WithReadDeadline(opts.DriveOPTimeout()), nil + } + return conn, nil } } diff --git a/internal/http/listener.go b/internal/http/listener.go index 9cf6425e6..f7e7e65d4 100644 --- a/internal/http/listener.go +++ b/internal/http/listener.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "syscall" + "time" ) type acceptResult struct { @@ -117,13 +118,27 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) { // TCPOptions specify customizable TCP optimizations on raw socket type TCPOptions struct { - UserTimeout int // this value is expected to be in milliseconds + UserTimeout int // this value is expected to be in milliseconds + + // When the net.Conn is a remote drive this value is honored, we close the connection to remote peer proactively. + DriveOPTimeout func() time.Duration + SendBufSize int // SO_SNDBUF size for the socket connection, NOTE: this sets server and client connection RecvBufSize int // SO_RECVBUF size for the socket connection, NOTE: this sets server and client connection Interface string // This is a VRF device passed via `--interface` flag Trace func(msg string) // Trace when starting. } +// ForWebsocket returns TCPOptions valid for websocket net.Conn +func (t TCPOptions) ForWebsocket() TCPOptions { + return TCPOptions{ + UserTimeout: t.UserTimeout, + Interface: t.Interface, + SendBufSize: t.SendBufSize, + RecvBufSize: t.RecvBufSize, + } +} + // newHTTPListener - creates new httpListener object which is interface compatible to net.Listener. // httpListener is capable to // * listen to multiple addresses diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index ced8afcfc..deefd0866 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -97,13 +97,6 @@ type ioret[V any] struct { err error } -// DeadlineWriter deadline writer with timeout -type DeadlineWriter struct { - io.WriteCloser - timeout time.Duration - err error -} - // WithDeadline will execute a function with a deadline and return a value of a given type. // If the deadline/context passes before the function finishes executing, // the zero value and the context error is returned. @@ -145,21 +138,17 @@ func NewDeadlineWorker(timeout time.Duration) *DeadlineWorker { // channel so that the work function can attempt to exit gracefully. // Multiple calls to Run will run independently of each other. func (d *DeadlineWorker) Run(work func() error) error { - c := make(chan ioret[struct{}], 1) - t := time.NewTimer(d.timeout) - go func() { - c <- ioret[struct{}]{val: struct{}{}, err: work()} - }() + _, err := WithDeadline[struct{}](context.Background(), d.timeout, func(ctx context.Context) (struct{}, error) { + return struct{}{}, work() + }) + return err +} - select { - case r := <-c: - if !t.Stop() { - <-t.C - } - return r.err - case <-t.C: - return context.DeadlineExceeded - } +// DeadlineWriter deadline writer with timeout +type DeadlineWriter struct { + io.WriteCloser + timeout time.Duration + err error } // NewDeadlineWriter wraps a writer to make it respect given deadline diff --git a/internal/ioutil/ioutil_test.go b/internal/ioutil/ioutil_test.go index 6e332b3f5..587c72d62 100644 --- a/internal/ioutil/ioutil_test.go +++ b/internal/ioutil/ioutil_test.go @@ -41,6 +41,26 @@ func (w *sleepWriter) Close() error { return nil } +func TestDeadlineWorker(t *testing.T) { + work := NewDeadlineWorker(500 * time.Millisecond) + + err := work.Run(func() error { + time.Sleep(600 * time.Millisecond) + return nil + }) + if err != context.DeadlineExceeded { + t.Error("DeadlineWorker shouldn't be successful - should return context.DeadlineExceeded") + } + + err = work.Run(func() error { + time.Sleep(450 * time.Millisecond) + return nil + }) + if err != nil { + t.Error("DeadlineWorker should succeed") + } +} + func TestDeadlineWriter(t *testing.T) { w := NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 450*time.Millisecond) _, err := w.Write([]byte("1"))