From 3e9ab5f4a99da6bab09ce33fb1063ce7ea1f5201 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 10 Dec 2019 20:28:22 -0800 Subject: [PATCH] Fix k8s replica set deployment (#8629) In replica sets, hosts resolve to localhost IP automatically until the deployment fully comes up. To avoid this issue we need to wait for such resolution. --- .travis.yml | 2 + Dockerfile.simpleci | 4 ++ cmd/endpoint.go | 97 +++++++++++++++++++++++++++++++++++++++------ cmd/iam.go | 96 ++++++++++++++++++++++++++++++++++---------- cmd/update.go | 41 +++++++++++-------- 5 files changed, 189 insertions(+), 51 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4d5ad8a4d..e5a9d12bd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,6 +26,7 @@ matrix: - ARCH=x86_64 - CGO_ENABLED=0 - GO111MODULE=on + - SIMPLE_CI=1 go: 1.13.x script: - make @@ -43,6 +44,7 @@ matrix: - ARCH=x86_64 - CGO_ENABLED=0 - GO111MODULE=on + - SIMPLE_CI=1 go: 1.13.x script: - go build --ldflags="$(go run buildscripts/gen-ldflags.go)" -o %GOPATH%\bin\minio.exe diff --git a/Dockerfile.simpleci b/Dockerfile.simpleci index 4e6f0291c..7fd5b4015 100644 --- a/Dockerfile.simpleci +++ b/Dockerfile.simpleci @@ -8,6 +8,7 @@ WORKDIR /go/src/github.com/minio/minio RUN apt-get update && apt-get install -y jq ENV GO111MODULE=on +ENV SIMPLE_CI 1 RUN git config --global http.cookiefile /gitcookie/.gitcookie @@ -43,6 +44,8 @@ RUN make verify #------------------------------------------------------------- FROM node:10.15-stretch-slim +ENV SIMPLE_CI 1 + COPY browser /minio/browser WORKDIR /minio/browser @@ -63,6 +66,7 @@ ENV LANG C.UTF-8 ENV GOROOT /usr/local/go ENV GOPATH /usr/local ENV PATH $GOPATH/bin:$GOROOT/bin:$PATH +ENV SIMPLE_CI 1 ENV MINT_ROOT_DIR /mint RUN apt-get --yes update && apt-get --yes upgrade && \ diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 76a3ee876..be635a718 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "errors" "fmt" "net" "net/url" @@ -229,8 +230,46 @@ func (endpoints Endpoints) GetString(i int) string { return endpoints[i].String() } +func (endpoints Endpoints) atleastOneEndpointLocal() bool { + for _, endpoint := range endpoints { + if endpoint.IsLocal { + return true + } + } + return false +} + +func (endpoints Endpoints) doAllHostsResolveToLocalhost() bool { + var endpointHosts = map[string]set.StringSet{} + for _, endpoint := range endpoints { + hostIPs, err := getHostIP(endpoint.Hostname()) + if err != nil { + continue + } + endpointHosts[endpoint.Hostname()] = hostIPs + } + sameHosts := make(map[string]int) + for hostName, endpointIPs := range endpointHosts { + for _, endpointIP := range endpointIPs.ToSlice() { + if net.ParseIP(endpointIP).IsLoopback() { + sameHosts[hostName]++ + } + } + } + ok := true + for _, localCount := range sameHosts { + ok = ok && localCount > 0 + } + if len(sameHosts) == 0 { + return false + } + return ok +} + // UpdateIsLocal - resolves the host and discovers the local host. func (endpoints Endpoints) UpdateIsLocal() error { + orchestrated := IsDocker() || IsKubernetes() + var epsResolved int var foundLocal bool resolvedList := make([]bool, len(endpoints)) @@ -256,29 +295,61 @@ func (endpoints Endpoints) UpdateIsLocal() error { continue } - // return err if not Docker or Kubernetes - // We use IsDocker() to check for Docker environment - // We use IsKubernetes() to check for Kubernetes environment - isLocal, err := isLocalHost(endpoints[i].Hostname(), endpoints[i].Port(), globalMinioPort) - if err != nil { - if !IsDocker() && !IsKubernetes() { - return err - } + // Log the message to console about the host resolving + reqInfo := (&logger.ReqInfo{}).AppendTags( + "host", + endpoints[i].Hostname(), + ) + + if orchestrated && endpoints.doAllHostsResolveToLocalhost() { + err := errors.New("hosts resolve to same IP, DNS not updated on k8s") // time elapsed timeElapsed := time.Since(startTime) // log error only if more than 1s elapsed if timeElapsed > time.Second { - // Log the message to console about the host not being resolveable. - reqInfo := (&logger.ReqInfo{}).AppendTags("host", endpoints[i].Hostname()) reqInfo.AppendTags("elapsedTime", - humanize.RelTime(startTime, startTime.Add(timeElapsed), - "elapsed", "")) + humanize.RelTime(startTime, + startTime.Add(timeElapsed), + "elapsed", + "")) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err, logger.Application) } + continue + } + + // return err if not Docker or Kubernetes + // We use IsDocker() to check for Docker environment + // We use IsKubernetes() to check for Kubernetes environment + isLocal, err := isLocalHost(endpoints[i].Hostname(), + endpoints[i].Port(), + globalMinioPort, + ) + if err != nil && !orchestrated { + return err + } + if err != nil { + // time elapsed + timeElapsed := time.Since(startTime) + // log error only if more than 1s elapsed + if timeElapsed > time.Second { + reqInfo.AppendTags("elapsedTime", + humanize.RelTime(startTime, + startTime.Add(timeElapsed), + "elapsed", + "", + )) + ctx := logger.SetReqInfo(context.Background(), + reqInfo) + logger.LogIf(ctx, err, logger.Application) + } } else { resolvedList[i] = true endpoints[i].IsLocal = isLocal + if orchestrated && !endpoints.atleastOneEndpointLocal() { + resolvedList[i] = false + continue + } epsResolved++ if !foundLocal { foundLocal = isLocal @@ -288,7 +359,7 @@ func (endpoints Endpoints) UpdateIsLocal() error { // Wait for the tick, if the there exist a local endpoint in discovery. // Non docker/kubernetes environment we do not need to wait. - if !foundLocal && (IsDocker() || IsKubernetes()) { + if !foundLocal && orchestrated { <-keepAliveTicker.C } } diff --git a/cmd/iam.go b/cmd/iam.go index a097bffb7..c54620324 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -221,13 +221,13 @@ type IAMStorageAPI interface { // simplifies the implementation for group removal. This is called // only via IAM notifications. func (sys *IAMSys) LoadGroup(objAPI ObjectLayer, group string) error { - if objAPI == nil { - return errInvalidArgument - } - sys.Lock() defer sys.Unlock() + if objAPI == nil || sys.store == nil { + return errServerNotInitialized + } + if globalEtcdClient != nil { // Watch APIs cover this case, so nothing to do. return nil @@ -262,13 +262,13 @@ func (sys *IAMSys) LoadGroup(objAPI ObjectLayer, group string) error { // LoadPolicy - reloads a specific canned policy from backend disks or etcd. func (sys *IAMSys) LoadPolicy(objAPI ObjectLayer, policyName string) error { - if objAPI == nil { - return errInvalidArgument - } - sys.Lock() defer sys.Unlock() + if objAPI == nil || sys.store == nil { + return errServerNotInitialized + } + if globalEtcdClient == nil { return sys.store.loadPolicyDoc(policyName, sys.iamPolicyDocsMap) } @@ -280,13 +280,13 @@ func (sys *IAMSys) LoadPolicy(objAPI ObjectLayer, policyName string) error { // LoadPolicyMapping - loads the mapped policy for a user or group // from storage into server memory. func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isGroup bool) error { - if objAPI == nil { - return errInvalidArgument - } - sys.Lock() defer sys.Unlock() + if objAPI == nil || sys.store == nil { + return errServerNotInitialized + } + if globalEtcdClient == nil { var err error if isGroup { @@ -306,13 +306,13 @@ func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isG // LoadUser - reloads a specific user from backend disks or etcd. func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, isSTS bool) error { - if objAPI == nil { - return errInvalidArgument - } - sys.Lock() defer sys.Unlock() + if objAPI == nil || sys.store == nil { + return errServerNotInitialized + } + if globalEtcdClient == nil { err := sys.store.loadUser(accessKey, isSTS, sys.iamUsersMap) if err != nil { @@ -351,14 +351,16 @@ func (sys *IAMSys) doIAMConfigMigration(objAPI ObjectLayer) error { // Init - initializes config system from iam.json func (sys *IAMSys) Init(objAPI ObjectLayer) error { if objAPI == nil { - return errInvalidArgument + return errServerNotInitialized } + sys.Lock() if globalEtcdClient == nil { sys.store = newIAMObjectStore() } else { sys.store = newIAMEtcdStore() } + sys.Unlock() doneCh := make(chan struct{}) defer close(doneCh) @@ -416,6 +418,13 @@ func (sys *IAMSys) DeletePolicy(policyName string) error { return errInvalidArgument } + sys.Lock() + defer sys.Unlock() + + if sys.store == nil { + return errServerNotInitialized + } + err := sys.store.deletePolicyDoc(policyName) switch err.(type) { case ObjectNotFound: @@ -423,9 +432,6 @@ func (sys *IAMSys) DeletePolicy(policyName string) error { err = nil } - sys.Lock() - defer sys.Unlock() - delete(sys.iamPolicyDocsMap, policyName) return err } @@ -481,12 +487,17 @@ func (sys *IAMSys) SetPolicy(policyName string, p iampolicy.Policy) error { return errInvalidArgument } + sys.Lock() + defer sys.Unlock() + + if sys.store == nil { + return errServerNotInitialized + } + if err := sys.store.savePolicyDoc(policyName, p); err != nil { return err } - sys.Lock() - defer sys.Unlock() sys.iamPolicyDocsMap[policyName] = p return nil } @@ -505,6 +516,10 @@ func (sys *IAMSys) DeleteUser(accessKey string) error { return errIAMActionNotAllowed } + if sys.store == nil { + return errServerNotInitialized + } + // It is ok to ignore deletion error on the mapped policy sys.store.deleteMappedPolicy(accessKey, false, false) err := sys.store.deleteUserIdentity(accessKey, false) @@ -543,6 +558,10 @@ func (sys *IAMSys) SetTempUser(accessKey string, cred auth.Credentials, policyNa return nil } + if sys.store == nil { + return errServerNotInitialized + } + mp := newMappedPolicy(policyName) if err := sys.store.saveMappedPolicy(accessKey, true, false, mp); err != nil { return err @@ -551,6 +570,10 @@ func (sys *IAMSys) SetTempUser(accessKey string, cred auth.Credentials, policyNa sys.iamUserPolicyMap[accessKey] = mp } + if sys.store == nil { + return errServerNotInitialized + } + u := newUserIdentity(cred) if err := sys.store.saveUserIdentity(accessKey, true, u); err != nil { return err @@ -659,6 +682,11 @@ func (sys *IAMSys) SetUserStatus(accessKey string, status madmin.AccountStatus) return config.EnableOff }(), }) + + if sys.store == nil { + return errServerNotInitialized + } + if err := sys.store.saveUserIdentity(accessKey, false, uinfo); err != nil { return err } @@ -687,6 +715,10 @@ func (sys *IAMSys) SetUser(accessKey string, uinfo madmin.UserInfo) error { return errIAMActionNotAllowed } + if sys.store == nil { + return errServerNotInitialized + } + if err := sys.store.saveUserIdentity(accessKey, false, u); err != nil { return err } @@ -718,6 +750,10 @@ func (sys *IAMSys) SetUserSecretKey(accessKey string, secretKey string) error { return errNoSuchUser } + if sys.store == nil { + return errServerNotInitialized + } + cred.SecretKey = secretKey u := newUserIdentity(cred) if err := sys.store.saveUserIdentity(accessKey, false, u); err != nil { @@ -775,6 +811,10 @@ func (sys *IAMSys) AddUsersToGroup(group string, members []string) error { gi.Members = uniqMembers } + if sys.store == nil { + return errServerNotInitialized + } + if err := sys.store.saveGroupInfo(group, gi); err != nil { return err } @@ -832,6 +872,10 @@ func (sys *IAMSys) RemoveUsersFromGroup(group string, members []string) error { return errGroupNotEmpty } + if sys.store == nil { + return errServerNotInitialized + } + if len(members) == 0 { // len(gi.Members) == 0 here. @@ -887,6 +931,10 @@ func (sys *IAMSys) SetGroupStatus(group string, enabled bool) error { sys.Lock() defer sys.Unlock() + if sys.store == nil { + return errServerNotInitialized + } + if sys.usersSysType != MinIOUsersSysType { return errIAMActionNotAllowed } @@ -984,6 +1032,10 @@ func (sys *IAMSys) PolicyDBSet(name, policy string, isGroup bool) error { // policyDBSet - sets a policy for user in the policy db. Assumes that // caller has sys.Lock(). func (sys *IAMSys) policyDBSet(name, policy string, isSTS, isGroup bool) error { + if sys.store == nil { + return errServerNotInitialized + } + if name == "" || policy == "" { return errInvalidArgument } diff --git a/cmd/update.go b/cmd/update.go index d339afbe5..4854f3c5d 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -130,31 +130,40 @@ func GetCurrentReleaseTime() (releaseTime time.Time, err error) { // "/.dockerenv": "file", // func IsDocker() bool { - _, err := os.Stat("/.dockerenv") - if os.IsNotExist(err) { - return false + if env.Get("SIMPLE_CI", "") == "" { + _, err := os.Stat("/.dockerenv") + if os.IsNotExist(err) { + return false + } + + // Log error, as we will not propagate it to caller + logger.LogIf(context.Background(), err) + + return err == nil } - - // Log error, as we will not propagate it to caller - logger.LogIf(context.Background(), err) - - return err == nil + return false } // IsDCOS returns true if minio is running in DCOS. func IsDCOS() bool { - // http://mesos.apache.org/documentation/latest/docker-containerizer/ - // Mesos docker containerizer sets this value - return env.Get("MESOS_CONTAINER_NAME", "") != "" + if env.Get("SIMPLE_CI", "") == "" { + // http://mesos.apache.org/documentation/latest/docker-containerizer/ + // Mesos docker containerizer sets this value + return env.Get("MESOS_CONTAINER_NAME", "") != "" + } + return false } // IsKubernetes returns true if minio is running in kubernetes. func IsKubernetes() bool { - // Kubernetes env used to validate if we are - // indeed running inside a kubernetes pod - // is KUBERNETES_SERVICE_HOST but in future - // we might need to enhance this. - return env.Get("KUBERNETES_SERVICE_HOST", "") != "" + if env.Get("SIMPLE_CI", "") == "" { + // Kubernetes env used to validate if we are + // indeed running inside a kubernetes pod + // is KUBERNETES_SERVICE_HOST but in future + // we might need to enhance this. + return env.Get("KUBERNETES_SERVICE_HOST", "") != "" + } + return false } // IsBOSH returns true if minio is deployed from a bosh package