allow nodes offline in k8s setups when expanding pools (#17183)

This commit is contained in:
Harshavardhana 2023-05-11 17:41:33 -07:00 committed by GitHub
parent ef7177ebbd
commit f5a20a5d06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 403 additions and 70 deletions

View File

@ -355,7 +355,7 @@ func createServerEndpoints(serverAddr string, args ...string) (
if err != nil {
return nil, -1, err
}
endpointList, newSetupType, err := CreateEndpoints(serverAddr, false, setArgs...)
endpointList, newSetupType, err := CreateEndpoints(serverAddr, setArgs...)
if err != nil {
return nil, -1, err
}
@ -371,37 +371,36 @@ func createServerEndpoints(serverAddr string, args ...string) (
return endpointServerPools, setupType, nil
}
var foundPrevLocal bool
var poolArgs [][][]string
for _, arg := range args {
if !ellipses.HasEllipses(arg) && len(args) > 1 {
// TODO: support SNSD deployments to be decommissioned in future
return nil, -1, fmt.Errorf("all args must have ellipses for pool expansion (%w) args: %s", errInvalidArgument, args)
}
setArgs, err := GetAllSets(arg)
if err != nil {
return nil, -1, err
}
endpointList, gotSetupType, err := CreateEndpoints(serverAddr, foundPrevLocal, setArgs...)
if err != nil {
return nil, -1, err
}
poolArgs = append(poolArgs, setArgs)
}
poolEndpoints, setupType, err := CreatePoolEndpoints(serverAddr, poolArgs...)
if err != nil {
return nil, -1, err
}
for i, endpointList := range poolEndpoints {
if err = endpointServerPools.Add(PoolEndpoints{
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
SetCount: len(poolArgs[i]),
DrivesPerSet: len(poolArgs[i][0]),
Endpoints: endpointList,
CmdLine: arg,
Platform: fmt.Sprintf("OS: %s | Arch: %s", runtime.GOOS, runtime.GOARCH),
CmdLine: args[i],
}); err != nil {
return nil, -1, err
}
foundPrevLocal = endpointList.atleastOneEndpointLocal()
if setupType == UnknownSetupType {
setupType = gotSetupType
}
if setupType == ErasureSetupType && gotSetupType == DistErasureSetupType {
setupType = DistErasureSetupType
}
}
return endpointServerPools, setupType, nil

View File

@ -409,15 +409,6 @@ func (endpoints Endpoints) GetAllStrings() (all []string) {
return
}
func (endpoints Endpoints) atleastOneEndpointLocal() bool {
for _, endpoint := range endpoints {
if endpoint.IsLocal {
return true
}
}
return false
}
func hostResolveToLocalhost(endpoint Endpoint) bool {
hostIPs, err := getHostIP(endpoint.Hostname())
if err != nil {
@ -440,7 +431,7 @@ func hostResolveToLocalhost(endpoint Endpoint) bool {
}
// UpdateIsLocal - resolves the host and discovers the local host.
func (endpoints Endpoints) UpdateIsLocal(foundPrevLocal bool) error {
func (endpoints Endpoints) UpdateIsLocal() error {
orchestrated := IsDocker() || IsKubernetes()
var epsResolved int
@ -448,13 +439,14 @@ func (endpoints Endpoints) UpdateIsLocal(foundPrevLocal bool) error {
resolvedList := make([]bool, len(endpoints))
// Mark the starting time
startTime := time.Now()
keepAliveTicker := time.NewTicker(10 * time.Millisecond)
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
for {
// Break if the local endpoint is found already Or all the endpoints are resolved.
if foundLocal || (epsResolved == len(endpoints)) {
break
}
// Retry infinitely on Kubernetes and Docker swarm.
// This is needed as the remote hosts are sometime
// not available immediately.
@ -610,8 +602,353 @@ func checkCrossDeviceMounts(endpoints Endpoints) (err error) {
return mountinfo.CheckCrossDevice(absPaths)
}
// PoolEndpointList is a temporary type to holds the list of endpoints
type PoolEndpointList []Endpoints
// UpdateIsLocal - resolves all hosts and discovers which are local
func (p PoolEndpointList) UpdateIsLocal() error {
orchestrated := IsDocker() || IsKubernetes()
var epsResolved int
var epCount int
for _, endpoints := range p {
epCount += len(endpoints)
}
var foundLocal bool
resolvedList := make(map[Endpoint]bool)
// Mark the starting time
startTime := time.Now()
keepAliveTicker := time.NewTicker(1 * time.Second)
defer keepAliveTicker.Stop()
for {
// Break if the local endpoint is found already Or all the endpoints are resolved.
if foundLocal || (epsResolved == epCount) {
break
}
// Retry infinitely on Kubernetes and Docker swarm.
// This is needed as the remote hosts are sometime
// not available immediately.
select {
case <-globalOSSignalCh:
return fmt.Errorf("The endpoint resolution got interrupted")
default:
for i, endpoints := range p {
for j, endpoint := range endpoints {
if resolvedList[endpoint] {
// Continue if host is already resolved.
continue
}
// Log the message to console about the host resolving
reqInfo := (&logger.ReqInfo{}).AppendTags(
"host",
endpoint.Hostname(),
)
if orchestrated && hostResolveToLocalhost(endpoint) {
// time elapsed
timeElapsed := time.Since(startTime)
// log error only if more than a second has elapsed
if timeElapsed > time.Second {
reqInfo.AppendTags("elapsedTime",
humanize.RelTime(startTime,
startTime.Add(timeElapsed),
"elapsed",
"",
))
ctx := logger.SetReqInfo(GlobalContext,
reqInfo)
logger.LogOnceIf(ctx, fmt.Errorf("%s resolves to localhost in a containerized deployment, waiting for it to resolve to a valid IP",
endpoint.Hostname()), endpoint.Hostname(), logger.Application)
}
continue
}
// return err if not Docker or Kubernetes
// We use IsDocker() to check for Docker environment
// We use IsKubernetes() to check for Kubernetes environment
isLocal, err := isLocalHost(endpoint.Hostname(),
endpoint.Port(),
globalMinioPort,
)
if err != nil && !orchestrated {
return err
}
if err != nil {
// time elapsed
timeElapsed := time.Since(startTime)
// log error only if more than a second has elapsed
if timeElapsed > time.Second {
reqInfo.AppendTags("elapsedTime",
humanize.RelTime(startTime,
startTime.Add(timeElapsed),
"elapsed",
"",
))
ctx := logger.SetReqInfo(GlobalContext,
reqInfo)
logger.LogOnceIf(ctx, err, endpoint.Hostname(), logger.Application)
}
} else {
resolvedList[endpoint] = true
endpoint.IsLocal = isLocal
epsResolved++
if !foundLocal {
foundLocal = isLocal
}
endpoints[j] = endpoint
}
}
p[i] = endpoints
// Wait for the tick, if the there exist a local endpoint in discovery.
// Non docker/kubernetes environment we do not need to wait.
if !foundLocal && orchestrated {
<-keepAliveTicker.C
}
}
}
}
// On Kubernetes/Docker setups DNS resolves inappropriately sometimes
// where there are situations same endpoints with multiple disks
// come online indicating either one of them is local and some
// of them are not local. This situation can never happen and
// its only a possibility in orchestrated deployments with dynamic
// DNS. Following code ensures that we treat if one of the endpoint
// says its local for a given host - it is true for all endpoints
// for the same host. Following code ensures that this assumption
// is true and it works in all scenarios and it is safe to assume
// for a given host.
for i, endpoints := range p {
endpointLocalMap := make(map[string]bool)
for _, ep := range endpoints {
if ep.IsLocal {
endpointLocalMap[ep.Host] = ep.IsLocal
}
}
for i := range endpoints {
endpoints[i].IsLocal = endpointLocalMap[endpoints[i].Host]
}
p[i] = endpoints
}
return nil
}
// CreatePoolEndpoints creates a list of endpoints per pool, resolves their relevant hostnames and
// discovers those are local or remote.
func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints, SetupType, error) {
var setupType SetupType
// Check whether serverAddr is valid for this host.
if err := CheckLocalServerAddr(serverAddr); err != nil {
return nil, setupType, err
}
_, serverAddrPort := mustSplitHostPort(serverAddr)
poolEndpoints := make(PoolEndpointList, len(poolArgs))
// For single arg, return single drive EC setup.
if len(poolArgs) == 1 && len(poolArgs[0]) == 1 && len(poolArgs[0][0]) == 1 && len(poolArgs[0][0][0]) == 1 {
endpoint, err := NewEndpoint(poolArgs[0][0][0])
if err != nil {
return nil, setupType, err
}
if err := endpoint.UpdateIsLocal(); err != nil {
return nil, setupType, err
}
if endpoint.Type() != PathEndpointType {
return nil, setupType, config.ErrInvalidEndpoint(nil).Msg("use path style endpoint for single node setup")
}
var endpoints Endpoints
endpoints = append(endpoints, endpoint)
setupType = ErasureSDSetupType
poolEndpoints[0] = endpoints
// Check for cross device mounts if any.
if err = checkCrossDeviceMounts(endpoints); err != nil {
return nil, setupType, config.ErrInvalidEndpoint(nil).Msg(err.Error())
}
return poolEndpoints, setupType, nil
}
for i, args := range poolArgs {
var endpoints Endpoints
for _, iargs := range args {
// Convert args to endpoints
eps, err := NewEndpoints(iargs...)
if err != nil {
return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
// Check for cross device mounts if any.
if err = checkCrossDeviceMounts(eps); err != nil {
return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
endpoints = append(endpoints, eps...)
}
if len(endpoints) == 0 {
return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg("invalid number of endpoints")
}
poolEndpoints[i] = endpoints
}
for _, endpoints := range poolEndpoints {
// Return Erasure setup when all endpoints are path style.
if endpoints[0].Type() == PathEndpointType {
setupType = ErasureSetupType
}
if endpoints[0].Type() == URLEndpointType && setupType != DistErasureSetupType {
setupType = DistErasureSetupType
}
}
if setupType == ErasureSetupType {
return poolEndpoints, setupType, nil
}
if err := poolEndpoints.UpdateIsLocal(); err != nil {
return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
uniqueArgs := set.NewStringSet()
for i, endpoints := range poolEndpoints {
// Here all endpoints are URL style.
endpointPathSet := set.NewStringSet()
localEndpointCount := 0
localServerHostSet := set.NewStringSet()
localPortSet := set.NewStringSet()
for _, endpoint := range endpoints {
endpointPathSet.Add(endpoint.Path)
if endpoint.IsLocal {
localServerHostSet.Add(endpoint.Hostname())
_, port, err := net.SplitHostPort(endpoint.Host)
if err != nil {
port = serverAddrPort
}
localPortSet.Add(port)
localEndpointCount++
}
}
orchestrated := IsKubernetes() || IsDocker()
if !orchestrated {
// Check whether same path is not used in endpoints of a host on different port.
// Only verify this on baremetal setups, DNS is not available in orchestrated
// environments so we can't do much here.
{
pathIPMap := make(map[string]set.StringSet)
hostIPCache := make(map[string]set.StringSet)
for _, endpoint := range endpoints {
host := endpoint.Hostname()
hostIPSet, ok := hostIPCache[host]
if !ok {
var err error
hostIPSet, err = getHostIP(host)
if err != nil {
return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("host '%s' cannot resolve: %s", host, err))
}
hostIPCache[host] = hostIPSet
}
if IPSet, ok := pathIPMap[endpoint.Path]; ok {
if !IPSet.Intersection(hostIPSet).IsEmpty() {
return nil, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("same path '%s' can not be served by different port on same address", endpoint.Path))
}
pathIPMap[endpoint.Path] = IPSet.Union(hostIPSet)
} else {
pathIPMap[endpoint.Path] = hostIPSet
}
}
}
}
// Check whether same path is used for more than 1 local endpoints.
{
localPathSet := set.CreateStringSet()
for _, endpoint := range endpoints {
if !endpoint.IsLocal {
continue
}
if localPathSet.Contains(endpoint.Path) {
return nil, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("path '%s' cannot be served by different address on same server", endpoint.Path))
}
localPathSet.Add(endpoint.Path)
}
}
// Add missing port in all endpoints.
for i := range endpoints {
_, port, err := net.SplitHostPort(endpoints[i].Host)
if err != nil {
endpoints[i].Host = net.JoinHostPort(endpoints[i].Host, serverAddrPort)
} else if endpoints[i].IsLocal && serverAddrPort != port {
// If endpoint is local, but port is different than serverAddrPort, then make it as remote.
endpoints[i].IsLocal = false
}
}
// All endpoints are pointing to local host
if len(endpoints) == localEndpointCount {
// If all endpoints have same port number, Just treat it as local erasure setup
// using URL style endpoints.
if len(localPortSet) == 1 {
if len(localServerHostSet) > 1 {
return nil, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg("all local endpoints should not have different hostnames/ips")
}
}
// Even though all endpoints are local, but those endpoints use different ports.
// This means it is DistErasure setup.
}
poolEndpoints[i] = endpoints
for _, endpoint := range endpoints {
uniqueArgs.Add(endpoint.Host)
}
poolEndpoints[i] = endpoints
}
publicIPs := env.Get(config.EnvPublicIPs, "")
if len(publicIPs) == 0 {
updateDomainIPs(uniqueArgs)
}
for _, endpoints := range poolEndpoints {
// Return Erasure setup when all endpoints are path style.
if endpoints[0].Type() == PathEndpointType {
setupType = ErasureSetupType
}
if endpoints[0].Type() == URLEndpointType && setupType != DistErasureSetupType {
setupType = DistErasureSetupType
}
}
return poolEndpoints, setupType, nil
}
// CreateEndpoints - validates and creates new endpoints for given args.
func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endpoints, SetupType, error) {
func CreateEndpoints(serverAddr string, args ...[]string) (Endpoints, SetupType, error) {
var endpoints Endpoints
var setupType SetupType
var err error
@ -672,7 +1009,7 @@ func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endp
return endpoints, setupType, nil
}
if err = endpoints.UpdateIsLocal(foundLocal); err != nil {
if err = endpoints.UpdateIsLocal(); err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}

View File

@ -314,7 +314,7 @@ func TestCreateEndpoints(t *testing.T) {
for _, testCase := range testCases {
testCase := testCase
t.Run("", func(t *testing.T) {
endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, false, testCase.args...)
endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...)
if err == nil && testCase.expectedErr != nil {
t.Errorf("error: expected = %v, got = <nil>", testCase.expectedErr)
}
@ -374,7 +374,7 @@ func TestGetLocalPeer(t *testing.T) {
for i, testCase := range testCases {
zendpoints := mustGetPoolEndpoints(testCase.endpointArgs...)
if !zendpoints[0].Endpoints[0].IsLocal {
if err := zendpoints[0].Endpoints.UpdateIsLocal(false); err != nil {
if err := zendpoints[0].Endpoints.UpdateIsLocal(); err != nil {
t.Fatalf("error: expected = <nil>, got = %v", err)
}
}
@ -407,7 +407,7 @@ func TestGetRemotePeers(t *testing.T) {
for _, testCase := range testCases {
zendpoints := mustGetPoolEndpoints(testCase.endpointArgs...)
if !zendpoints[0].Endpoints[0].IsLocal {
if err := zendpoints[0].Endpoints.UpdateIsLocal(false); err != nil {
if err := zendpoints[0].Endpoints.UpdateIsLocal(); err != nil {
t.Errorf("error: expected = <nil>, got = %v", err)
}
}

View File

@ -291,49 +291,46 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, poolCount, setCou
tries++ // tried already once
// Wait on each try for an update.
ticker := time.NewTicker(150 * time.Millisecond)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
// Only log once every 10 iterations, then reset the tries count.
verboseLogging = tries >= 10
if verboseLogging {
tries = 1
}
storageDisks, format, err := connectLoadInitFormats(verboseLogging, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo)
if err == nil {
return storageDisks, format, nil
}
tries++
switch err {
case errNotFirstDisk:
// Fresh setup, wait for first server to be up.
logger.Info("Waiting for the first server to format the drives (elapsed %s)\n", getElapsedTime())
case errFirstDiskWait:
// Fresh setup, wait for other servers to come up.
logger.Info("Waiting for all other servers to be online to format the drives (elapses %s)\n", getElapsedTime())
case errErasureReadQuorum:
// no quorum available continue to wait for minimum number of servers.
logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n",
len(endpoints)/2, getElapsedTime())
case errErasureWriteQuorum:
// no quorum available continue to wait for minimum number of servers.
logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n",
(len(endpoints)/2)+1, getElapsedTime())
case errErasureV3ThisEmpty:
// need to wait for this error to be healed, so continue.
default:
// For all other unhandled errors we exit and fail.
return nil, nil, err
}
select {
case <-ticker.C:
// Only log once every 10 iterations, then reset the tries count.
verboseLogging = tries >= 10
if verboseLogging {
tries = 1
}
storageDisks, format, err := connectLoadInitFormats(verboseLogging, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo)
if err != nil {
tries++
switch err {
case errNotFirstDisk:
// Fresh setup, wait for first server to be up.
logger.Info("Waiting for the first server to format the drives (elapsed %s)\n", getElapsedTime())
continue
case errFirstDiskWait:
// Fresh setup, wait for other servers to come up.
logger.Info("Waiting for all other servers to be online to format the drives (elapses %s)\n", getElapsedTime())
continue
case errErasureReadQuorum:
// no quorum available continue to wait for minimum number of servers.
logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n",
len(endpoints)/2, getElapsedTime())
continue
case errErasureWriteQuorum:
// no quorum available continue to wait for minimum number of servers.
logger.Info("Waiting for a minimum of %d drives to come online (elapsed %s)\n",
(len(endpoints)/2)+1, getElapsedTime())
continue
case errErasureV3ThisEmpty:
// need to wait for this error to be healed, so continue.
continue
default:
// For all other unhandled errors we exit and fail.
return nil, nil, err
}
}
return storageDisks, format, nil
case <-globalOSSignalCh:
return nil, nil, fmt.Errorf("Initializing data volumes gracefully stopped")
}