From f5a20a5d06b7980c85fd010a52a0f8b2ce9c8ed7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 11 May 2023 17:41:33 -0700 Subject: [PATCH] allow nodes offline in k8s setups when expanding pools (#17183) --- cmd/endpoint-ellipses.go | 31 ++-- cmd/endpoint.go | 363 +++++++++++++++++++++++++++++++++++++-- cmd/endpoint_test.go | 6 +- cmd/prepare-storage.go | 73 ++++---- 4 files changed, 403 insertions(+), 70 deletions(-) diff --git a/cmd/endpoint-ellipses.go b/cmd/endpoint-ellipses.go index c8f4a990a..ae5350faa 100644 --- a/cmd/endpoint-ellipses.go +++ b/cmd/endpoint-ellipses.go @@ -355,7 +355,7 @@ func createServerEndpoints(serverAddr string, args ...string) ( if err != nil { return nil, -1, err } - endpointList, newSetupType, err := CreateEndpoints(serverAddr, false, setArgs...) + endpointList, newSetupType, err := CreateEndpoints(serverAddr, setArgs...) if err != nil { return nil, -1, err } @@ -371,37 +371,36 @@ func createServerEndpoints(serverAddr string, args ...string) ( return endpointServerPools, setupType, nil } - var foundPrevLocal bool + var poolArgs [][][]string for _, arg := range args { if !ellipses.HasEllipses(arg) && len(args) > 1 { // TODO: support SNSD deployments to be decommissioned in future return nil, -1, fmt.Errorf("all args must have ellipses for pool expansion (%w) args: %s", errInvalidArgument, args) } + setArgs, err := GetAllSets(arg) if err != nil { return nil, -1, err } - endpointList, gotSetupType, err := CreateEndpoints(serverAddr, foundPrevLocal, setArgs...) - if err != nil { - return nil, -1, err - } + poolArgs = append(poolArgs, setArgs) + } + + poolEndpoints, setupType, err := CreatePoolEndpoints(serverAddr, poolArgs...) + if err != nil { + return nil, -1, err + } + + for i, endpointList := range poolEndpoints { if err = endpointServerPools.Add(PoolEndpoints{ - SetCount: len(setArgs), - DrivesPerSet: len(setArgs[0]), + SetCount: len(poolArgs[i]), + DrivesPerSet: len(poolArgs[i][0]), Endpoints: endpointList, - CmdLine: arg, Platform: fmt.Sprintf("OS: %s | Arch: %s", runtime.GOOS, runtime.GOARCH), + CmdLine: args[i], }); err != nil { return nil, -1, err } - foundPrevLocal = endpointList.atleastOneEndpointLocal() - if setupType == UnknownSetupType { - setupType = gotSetupType - } - if setupType == ErasureSetupType && gotSetupType == DistErasureSetupType { - setupType = DistErasureSetupType - } } return endpointServerPools, setupType, nil diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 0ca2aba30..7b886d4a4 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -409,15 +409,6 @@ func (endpoints Endpoints) GetAllStrings() (all []string) { return } -func (endpoints Endpoints) atleastOneEndpointLocal() bool { - for _, endpoint := range endpoints { - if endpoint.IsLocal { - return true - } - } - return false -} - func hostResolveToLocalhost(endpoint Endpoint) bool { hostIPs, err := getHostIP(endpoint.Hostname()) if err != nil { @@ -440,7 +431,7 @@ func hostResolveToLocalhost(endpoint Endpoint) bool { } // UpdateIsLocal - resolves the host and discovers the local host. -func (endpoints Endpoints) UpdateIsLocal(foundPrevLocal bool) error { +func (endpoints Endpoints) UpdateIsLocal() error { orchestrated := IsDocker() || IsKubernetes() var epsResolved int @@ -448,13 +439,14 @@ func (endpoints Endpoints) UpdateIsLocal(foundPrevLocal bool) error { resolvedList := make([]bool, len(endpoints)) // Mark the starting time startTime := time.Now() - keepAliveTicker := time.NewTicker(10 * time.Millisecond) + keepAliveTicker := time.NewTicker(500 * time.Millisecond) defer keepAliveTicker.Stop() for { // Break if the local endpoint is found already Or all the endpoints are resolved. if foundLocal || (epsResolved == len(endpoints)) { break } + // Retry infinitely on Kubernetes and Docker swarm. // This is needed as the remote hosts are sometime // not available immediately. @@ -610,8 +602,353 @@ func checkCrossDeviceMounts(endpoints Endpoints) (err error) { return mountinfo.CheckCrossDevice(absPaths) } +// PoolEndpointList is a temporary type to holds the list of endpoints +type PoolEndpointList []Endpoints + +// UpdateIsLocal - resolves all hosts and discovers which are local +func (p PoolEndpointList) UpdateIsLocal() error { + orchestrated := IsDocker() || IsKubernetes() + + var epsResolved int + var epCount int + + for _, endpoints := range p { + epCount += len(endpoints) + } + + var foundLocal bool + resolvedList := make(map[Endpoint]bool) + + // Mark the starting time + startTime := time.Now() + keepAliveTicker := time.NewTicker(1 * time.Second) + defer keepAliveTicker.Stop() + for { + // Break if the local endpoint is found already Or all the endpoints are resolved. + if foundLocal || (epsResolved == epCount) { + break + } + + // Retry infinitely on Kubernetes and Docker swarm. + // This is needed as the remote hosts are sometime + // not available immediately. + select { + case <-globalOSSignalCh: + return fmt.Errorf("The endpoint resolution got interrupted") + default: + for i, endpoints := range p { + for j, endpoint := range endpoints { + if resolvedList[endpoint] { + // Continue if host is already resolved. + continue + } + + // Log the message to console about the host resolving + reqInfo := (&logger.ReqInfo{}).AppendTags( + "host", + endpoint.Hostname(), + ) + + if orchestrated && hostResolveToLocalhost(endpoint) { + // time elapsed + timeElapsed := time.Since(startTime) + // log error only if more than a second has elapsed + if timeElapsed > time.Second { + reqInfo.AppendTags("elapsedTime", + humanize.RelTime(startTime, + startTime.Add(timeElapsed), + "elapsed", + "", + )) + ctx := logger.SetReqInfo(GlobalContext, + reqInfo) + logger.LogOnceIf(ctx, fmt.Errorf("%s resolves to localhost in a containerized deployment, waiting for it to resolve to a valid IP", + endpoint.Hostname()), endpoint.Hostname(), logger.Application) + } + continue + } + + // return err if not Docker or Kubernetes + // We use IsDocker() to check for Docker environment + // We use IsKubernetes() to check for Kubernetes environment + isLocal, err := isLocalHost(endpoint.Hostname(), + endpoint.Port(), + globalMinioPort, + ) + if err != nil && !orchestrated { + return err + } + if err != nil { + // time elapsed + timeElapsed := time.Since(startTime) + // log error only if more than a second has elapsed + if timeElapsed > time.Second { + reqInfo.AppendTags("elapsedTime", + humanize.RelTime(startTime, + startTime.Add(timeElapsed), + "elapsed", + "", + )) + ctx := logger.SetReqInfo(GlobalContext, + reqInfo) + logger.LogOnceIf(ctx, err, endpoint.Hostname(), logger.Application) + } + } else { + resolvedList[endpoint] = true + endpoint.IsLocal = isLocal + epsResolved++ + if !foundLocal { + foundLocal = isLocal + } + endpoints[j] = endpoint + } + } + + p[i] = endpoints + + // Wait for the tick, if the there exist a local endpoint in discovery. + // Non docker/kubernetes environment we do not need to wait. + if !foundLocal && orchestrated { + <-keepAliveTicker.C + } + } + } + } + + // On Kubernetes/Docker setups DNS resolves inappropriately sometimes + // where there are situations same endpoints with multiple disks + // come online indicating either one of them is local and some + // of them are not local. This situation can never happen and + // its only a possibility in orchestrated deployments with dynamic + // DNS. Following code ensures that we treat if one of the endpoint + // says its local for a given host - it is true for all endpoints + // for the same host. Following code ensures that this assumption + // is true and it works in all scenarios and it is safe to assume + // for a given host. + for i, endpoints := range p { + endpointLocalMap := make(map[string]bool) + for _, ep := range endpoints { + if ep.IsLocal { + endpointLocalMap[ep.Host] = ep.IsLocal + } + } + for i := range endpoints { + endpoints[i].IsLocal = endpointLocalMap[endpoints[i].Host] + } + p[i] = endpoints + } + + return nil +} + +// CreatePoolEndpoints creates a list of endpoints per pool, resolves their relevant hostnames and +// discovers those are local or remote. +func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints, SetupType, error) { + var setupType SetupType + + // Check whether serverAddr is valid for this host. + if err := CheckLocalServerAddr(serverAddr); err != nil { + return nil, setupType, err + } + + _, serverAddrPort := mustSplitHostPort(serverAddr) + + poolEndpoints := make(PoolEndpointList, len(poolArgs)) + + // For single arg, return single drive EC setup. + if len(poolArgs) == 1 && len(poolArgs[0]) == 1 && len(poolArgs[0][0]) == 1 && len(poolArgs[0][0][0]) == 1 { + endpoint, err := NewEndpoint(poolArgs[0][0][0]) + if err != nil { + return nil, setupType, err + } + if err := endpoint.UpdateIsLocal(); err != nil { + return nil, setupType, err + } + if endpoint.Type() != PathEndpointType { + return nil, setupType, config.ErrInvalidEndpoint(nil).Msg("use path style endpoint for single node setup") + } + + var endpoints Endpoints + endpoints = append(endpoints, endpoint) + setupType = ErasureSDSetupType + + poolEndpoints[0] = endpoints + // Check for cross device mounts if any. + if err = checkCrossDeviceMounts(endpoints); err != nil { + return nil, setupType, config.ErrInvalidEndpoint(nil).Msg(err.Error()) + } + + return poolEndpoints, setupType, nil + } + + for i, args := range poolArgs { + var endpoints Endpoints + for _, iargs := range args { + // Convert args to endpoints + eps, err := NewEndpoints(iargs...) + if err != nil { + return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error()) + } + + // Check for cross device mounts if any. + if err = checkCrossDeviceMounts(eps); err != nil { + return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error()) + } + + endpoints = append(endpoints, eps...) + } + + if len(endpoints) == 0 { + return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg("invalid number of endpoints") + } + + poolEndpoints[i] = endpoints + } + + for _, endpoints := range poolEndpoints { + // Return Erasure setup when all endpoints are path style. + if endpoints[0].Type() == PathEndpointType { + setupType = ErasureSetupType + } + if endpoints[0].Type() == URLEndpointType && setupType != DistErasureSetupType { + setupType = DistErasureSetupType + } + } + + if setupType == ErasureSetupType { + return poolEndpoints, setupType, nil + } + + if err := poolEndpoints.UpdateIsLocal(); err != nil { + return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error()) + } + + uniqueArgs := set.NewStringSet() + + for i, endpoints := range poolEndpoints { + // Here all endpoints are URL style. + endpointPathSet := set.NewStringSet() + localEndpointCount := 0 + localServerHostSet := set.NewStringSet() + localPortSet := set.NewStringSet() + + for _, endpoint := range endpoints { + endpointPathSet.Add(endpoint.Path) + if endpoint.IsLocal { + localServerHostSet.Add(endpoint.Hostname()) + + _, port, err := net.SplitHostPort(endpoint.Host) + if err != nil { + port = serverAddrPort + } + localPortSet.Add(port) + + localEndpointCount++ + } + } + + orchestrated := IsKubernetes() || IsDocker() + if !orchestrated { + // Check whether same path is not used in endpoints of a host on different port. + // Only verify this on baremetal setups, DNS is not available in orchestrated + // environments so we can't do much here. + { + pathIPMap := make(map[string]set.StringSet) + hostIPCache := make(map[string]set.StringSet) + for _, endpoint := range endpoints { + host := endpoint.Hostname() + hostIPSet, ok := hostIPCache[host] + if !ok { + var err error + hostIPSet, err = getHostIP(host) + if err != nil { + return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("host '%s' cannot resolve: %s", host, err)) + } + hostIPCache[host] = hostIPSet + } + if IPSet, ok := pathIPMap[endpoint.Path]; ok { + if !IPSet.Intersection(hostIPSet).IsEmpty() { + return nil, setupType, + config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("same path '%s' can not be served by different port on same address", endpoint.Path)) + } + pathIPMap[endpoint.Path] = IPSet.Union(hostIPSet) + } else { + pathIPMap[endpoint.Path] = hostIPSet + } + } + } + } + + // Check whether same path is used for more than 1 local endpoints. + { + localPathSet := set.CreateStringSet() + for _, endpoint := range endpoints { + if !endpoint.IsLocal { + continue + } + if localPathSet.Contains(endpoint.Path) { + return nil, setupType, + config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("path '%s' cannot be served by different address on same server", endpoint.Path)) + } + localPathSet.Add(endpoint.Path) + } + } + + // Add missing port in all endpoints. + for i := range endpoints { + _, port, err := net.SplitHostPort(endpoints[i].Host) + if err != nil { + endpoints[i].Host = net.JoinHostPort(endpoints[i].Host, serverAddrPort) + } else if endpoints[i].IsLocal && serverAddrPort != port { + // If endpoint is local, but port is different than serverAddrPort, then make it as remote. + endpoints[i].IsLocal = false + } + } + + // All endpoints are pointing to local host + if len(endpoints) == localEndpointCount { + // If all endpoints have same port number, Just treat it as local erasure setup + // using URL style endpoints. + if len(localPortSet) == 1 { + if len(localServerHostSet) > 1 { + return nil, setupType, + config.ErrInvalidErasureEndpoints(nil).Msg("all local endpoints should not have different hostnames/ips") + } + } + + // Even though all endpoints are local, but those endpoints use different ports. + // This means it is DistErasure setup. + } + + poolEndpoints[i] = endpoints + + for _, endpoint := range endpoints { + uniqueArgs.Add(endpoint.Host) + } + + poolEndpoints[i] = endpoints + } + + publicIPs := env.Get(config.EnvPublicIPs, "") + if len(publicIPs) == 0 { + updateDomainIPs(uniqueArgs) + } + + for _, endpoints := range poolEndpoints { + // Return Erasure setup when all endpoints are path style. + if endpoints[0].Type() == PathEndpointType { + setupType = ErasureSetupType + } + if endpoints[0].Type() == URLEndpointType && setupType != DistErasureSetupType { + setupType = DistErasureSetupType + } + } + + return poolEndpoints, setupType, nil +} + // CreateEndpoints - validates and creates new endpoints for given args. -func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endpoints, SetupType, error) { +func CreateEndpoints(serverAddr string, args ...[]string) (Endpoints, SetupType, error) { var endpoints Endpoints var setupType SetupType var err error @@ -672,7 +1009,7 @@ func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endp return endpoints, setupType, nil } - if err = endpoints.UpdateIsLocal(foundLocal); err != nil { + if err = endpoints.UpdateIsLocal(); err != nil { return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error()) } diff --git a/cmd/endpoint_test.go b/cmd/endpoint_test.go index 2f8a0a4e6..4be644afa 100644 --- a/cmd/endpoint_test.go +++ b/cmd/endpoint_test.go @@ -314,7 +314,7 @@ func TestCreateEndpoints(t *testing.T) { for _, testCase := range testCases { testCase := testCase t.Run("", func(t *testing.T) { - endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, false, testCase.args...) + endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...) if err == nil && testCase.expectedErr != nil { t.Errorf("error: expected = %v, got = ", testCase.expectedErr) } @@ -374,7 +374,7 @@ func TestGetLocalPeer(t *testing.T) { for i, testCase := range testCases { zendpoints := mustGetPoolEndpoints(testCase.endpointArgs...) if !zendpoints[0].Endpoints[0].IsLocal { - if err := zendpoints[0].Endpoints.UpdateIsLocal(false); err != nil { + if err := zendpoints[0].Endpoints.UpdateIsLocal(); err != nil { t.Fatalf("error: expected = , got = %v", err) } } @@ -407,7 +407,7 @@ func TestGetRemotePeers(t *testing.T) { for _, testCase := range testCases { zendpoints := mustGetPoolEndpoints(testCase.endpointArgs...) if !zendpoints[0].Endpoints[0].IsLocal { - if err := zendpoints[0].Endpoints.UpdateIsLocal(false); err != nil { + if err := zendpoints[0].Endpoints.UpdateIsLocal(); err != nil { t.Errorf("error: expected = , got = %v", err) } } diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 3a602b769..6a137d22a 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -291,49 +291,46 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, poolCount, setCou tries++ // tried already once // Wait on each try for an update. - ticker := time.NewTicker(150 * time.Millisecond) + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { + // Only log once every 10 iterations, then reset the tries count. + verboseLogging = tries >= 10 + if verboseLogging { + tries = 1 + } + + storageDisks, format, err := connectLoadInitFormats(verboseLogging, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo) + if err == nil { + return storageDisks, format, nil + } + + tries++ + switch err { + case errNotFirstDisk: + // Fresh setup, wait for first server to be up. + logger.Info("Waiting for the first server to format the drives (elapsed %s)\n", getElapsedTime()) + case errFirstDiskWait: + // Fresh setup, wait for other servers to come up. + logger.Info("Waiting for all other servers to be online to format the drives (elapses %s)\n", getElapsedTime()) + case errErasureReadQuorum: + // no quorum available continue to wait for minimum number of servers. + logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n", + len(endpoints)/2, getElapsedTime()) + case errErasureWriteQuorum: + // no quorum available continue to wait for minimum number of servers. + logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n", + (len(endpoints)/2)+1, getElapsedTime()) + case errErasureV3ThisEmpty: + // need to wait for this error to be healed, so continue. + default: + // For all other unhandled errors we exit and fail. + return nil, nil, err + } + select { case <-ticker.C: - // Only log once every 10 iterations, then reset the tries count. - verboseLogging = tries >= 10 - if verboseLogging { - tries = 1 - } - - storageDisks, format, err := connectLoadInitFormats(verboseLogging, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo) - if err != nil { - tries++ - switch err { - case errNotFirstDisk: - // Fresh setup, wait for first server to be up. - logger.Info("Waiting for the first server to format the drives (elapsed %s)\n", getElapsedTime()) - continue - case errFirstDiskWait: - // Fresh setup, wait for other servers to come up. - logger.Info("Waiting for all other servers to be online to format the drives (elapses %s)\n", getElapsedTime()) - continue - case errErasureReadQuorum: - // no quorum available continue to wait for minimum number of servers. - logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n", - len(endpoints)/2, getElapsedTime()) - continue - case errErasureWriteQuorum: - // no quorum available continue to wait for minimum number of servers. - logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n", - (len(endpoints)/2)+1, getElapsedTime()) - continue - case errErasureV3ThisEmpty: - // need to wait for this error to be healed, so continue. - continue - default: - // For all other unhandled errors we exit and fail. - return nil, nil, err - } - } - return storageDisks, format, nil case <-globalOSSignalCh: return nil, nil, fmt.Errorf("Initializing data volumes gracefully stopped") }