Fix k8s replica set deployment (#8629)

In replica sets, hosts resolve to localhost
IP automatically until the deployment fully
comes up. To avoid this issue we need to
wait for such resolution.
This commit is contained in:
Harshavardhana 2019-12-10 20:28:22 -08:00 committed by GitHub
parent 46b4dd8e20
commit 3e9ab5f4a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 189 additions and 51 deletions

View File

@ -26,6 +26,7 @@ matrix:
- ARCH=x86_64 - ARCH=x86_64
- CGO_ENABLED=0 - CGO_ENABLED=0
- GO111MODULE=on - GO111MODULE=on
- SIMPLE_CI=1
go: 1.13.x go: 1.13.x
script: script:
- make - make
@ -43,6 +44,7 @@ matrix:
- ARCH=x86_64 - ARCH=x86_64
- CGO_ENABLED=0 - CGO_ENABLED=0
- GO111MODULE=on - GO111MODULE=on
- SIMPLE_CI=1
go: 1.13.x go: 1.13.x
script: script:
- go build --ldflags="$(go run buildscripts/gen-ldflags.go)" -o %GOPATH%\bin\minio.exe - go build --ldflags="$(go run buildscripts/gen-ldflags.go)" -o %GOPATH%\bin\minio.exe

View File

@ -8,6 +8,7 @@ WORKDIR /go/src/github.com/minio/minio
RUN apt-get update && apt-get install -y jq RUN apt-get update && apt-get install -y jq
ENV GO111MODULE=on ENV GO111MODULE=on
ENV SIMPLE_CI 1
RUN git config --global http.cookiefile /gitcookie/.gitcookie RUN git config --global http.cookiefile /gitcookie/.gitcookie
@ -43,6 +44,8 @@ RUN make verify
#------------------------------------------------------------- #-------------------------------------------------------------
FROM node:10.15-stretch-slim FROM node:10.15-stretch-slim
ENV SIMPLE_CI 1
COPY browser /minio/browser COPY browser /minio/browser
WORKDIR /minio/browser WORKDIR /minio/browser
@ -63,6 +66,7 @@ ENV LANG C.UTF-8
ENV GOROOT /usr/local/go ENV GOROOT /usr/local/go
ENV GOPATH /usr/local ENV GOPATH /usr/local
ENV PATH $GOPATH/bin:$GOROOT/bin:$PATH ENV PATH $GOPATH/bin:$GOROOT/bin:$PATH
ENV SIMPLE_CI 1
ENV MINT_ROOT_DIR /mint ENV MINT_ROOT_DIR /mint
RUN apt-get --yes update && apt-get --yes upgrade && \ RUN apt-get --yes update && apt-get --yes upgrade && \

View File

@ -18,6 +18,7 @@ package cmd
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net" "net"
"net/url" "net/url"
@ -229,8 +230,46 @@ func (endpoints Endpoints) GetString(i int) string {
return endpoints[i].String() return endpoints[i].String()
} }
func (endpoints Endpoints) atleastOneEndpointLocal() bool {
for _, endpoint := range endpoints {
if endpoint.IsLocal {
return true
}
}
return false
}
func (endpoints Endpoints) doAllHostsResolveToLocalhost() bool {
var endpointHosts = map[string]set.StringSet{}
for _, endpoint := range endpoints {
hostIPs, err := getHostIP(endpoint.Hostname())
if err != nil {
continue
}
endpointHosts[endpoint.Hostname()] = hostIPs
}
sameHosts := make(map[string]int)
for hostName, endpointIPs := range endpointHosts {
for _, endpointIP := range endpointIPs.ToSlice() {
if net.ParseIP(endpointIP).IsLoopback() {
sameHosts[hostName]++
}
}
}
ok := true
for _, localCount := range sameHosts {
ok = ok && localCount > 0
}
if len(sameHosts) == 0 {
return false
}
return ok
}
// UpdateIsLocal - resolves the host and discovers the local host. // UpdateIsLocal - resolves the host and discovers the local host.
func (endpoints Endpoints) UpdateIsLocal() error { func (endpoints Endpoints) UpdateIsLocal() error {
orchestrated := IsDocker() || IsKubernetes()
var epsResolved int var epsResolved int
var foundLocal bool var foundLocal bool
resolvedList := make([]bool, len(endpoints)) resolvedList := make([]bool, len(endpoints))
@ -256,29 +295,61 @@ func (endpoints Endpoints) UpdateIsLocal() error {
continue continue
} }
// return err if not Docker or Kubernetes // Log the message to console about the host resolving
// We use IsDocker() to check for Docker environment reqInfo := (&logger.ReqInfo{}).AppendTags(
// We use IsKubernetes() to check for Kubernetes environment "host",
isLocal, err := isLocalHost(endpoints[i].Hostname(), endpoints[i].Port(), globalMinioPort) endpoints[i].Hostname(),
if err != nil { )
if !IsDocker() && !IsKubernetes() {
return err if orchestrated && endpoints.doAllHostsResolveToLocalhost() {
} err := errors.New("hosts resolve to same IP, DNS not updated on k8s")
// time elapsed // time elapsed
timeElapsed := time.Since(startTime) timeElapsed := time.Since(startTime)
// log error only if more than 1s elapsed // log error only if more than 1s elapsed
if timeElapsed > time.Second { if timeElapsed > time.Second {
// Log the message to console about the host not being resolveable.
reqInfo := (&logger.ReqInfo{}).AppendTags("host", endpoints[i].Hostname())
reqInfo.AppendTags("elapsedTime", reqInfo.AppendTags("elapsedTime",
humanize.RelTime(startTime, startTime.Add(timeElapsed), humanize.RelTime(startTime,
"elapsed", "")) startTime.Add(timeElapsed),
"elapsed",
""))
ctx := logger.SetReqInfo(context.Background(), reqInfo) ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err, logger.Application) logger.LogIf(ctx, err, logger.Application)
} }
continue
}
// return err if not Docker or Kubernetes
// We use IsDocker() to check for Docker environment
// We use IsKubernetes() to check for Kubernetes environment
isLocal, err := isLocalHost(endpoints[i].Hostname(),
endpoints[i].Port(),
globalMinioPort,
)
if err != nil && !orchestrated {
return err
}
if err != nil {
// time elapsed
timeElapsed := time.Since(startTime)
// log error only if more than 1s elapsed
if timeElapsed > time.Second {
reqInfo.AppendTags("elapsedTime",
humanize.RelTime(startTime,
startTime.Add(timeElapsed),
"elapsed",
"",
))
ctx := logger.SetReqInfo(context.Background(),
reqInfo)
logger.LogIf(ctx, err, logger.Application)
}
} else { } else {
resolvedList[i] = true resolvedList[i] = true
endpoints[i].IsLocal = isLocal endpoints[i].IsLocal = isLocal
if orchestrated && !endpoints.atleastOneEndpointLocal() {
resolvedList[i] = false
continue
}
epsResolved++ epsResolved++
if !foundLocal { if !foundLocal {
foundLocal = isLocal foundLocal = isLocal
@ -288,7 +359,7 @@ func (endpoints Endpoints) UpdateIsLocal() error {
// Wait for the tick, if the there exist a local endpoint in discovery. // Wait for the tick, if the there exist a local endpoint in discovery.
// Non docker/kubernetes environment we do not need to wait. // Non docker/kubernetes environment we do not need to wait.
if !foundLocal && (IsDocker() || IsKubernetes()) { if !foundLocal && orchestrated {
<-keepAliveTicker.C <-keepAliveTicker.C
} }
} }

View File

@ -221,13 +221,13 @@ type IAMStorageAPI interface {
// simplifies the implementation for group removal. This is called // simplifies the implementation for group removal. This is called
// only via IAM notifications. // only via IAM notifications.
func (sys *IAMSys) LoadGroup(objAPI ObjectLayer, group string) error { func (sys *IAMSys) LoadGroup(objAPI ObjectLayer, group string) error {
if objAPI == nil {
return errInvalidArgument
}
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
if objAPI == nil || sys.store == nil {
return errServerNotInitialized
}
if globalEtcdClient != nil { if globalEtcdClient != nil {
// Watch APIs cover this case, so nothing to do. // Watch APIs cover this case, so nothing to do.
return nil return nil
@ -262,13 +262,13 @@ func (sys *IAMSys) LoadGroup(objAPI ObjectLayer, group string) error {
// LoadPolicy - reloads a specific canned policy from backend disks or etcd. // LoadPolicy - reloads a specific canned policy from backend disks or etcd.
func (sys *IAMSys) LoadPolicy(objAPI ObjectLayer, policyName string) error { func (sys *IAMSys) LoadPolicy(objAPI ObjectLayer, policyName string) error {
if objAPI == nil {
return errInvalidArgument
}
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
if objAPI == nil || sys.store == nil {
return errServerNotInitialized
}
if globalEtcdClient == nil { if globalEtcdClient == nil {
return sys.store.loadPolicyDoc(policyName, sys.iamPolicyDocsMap) return sys.store.loadPolicyDoc(policyName, sys.iamPolicyDocsMap)
} }
@ -280,13 +280,13 @@ func (sys *IAMSys) LoadPolicy(objAPI ObjectLayer, policyName string) error {
// LoadPolicyMapping - loads the mapped policy for a user or group // LoadPolicyMapping - loads the mapped policy for a user or group
// from storage into server memory. // from storage into server memory.
func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isGroup bool) error { func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isGroup bool) error {
if objAPI == nil {
return errInvalidArgument
}
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
if objAPI == nil || sys.store == nil {
return errServerNotInitialized
}
if globalEtcdClient == nil { if globalEtcdClient == nil {
var err error var err error
if isGroup { if isGroup {
@ -306,13 +306,13 @@ func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isG
// LoadUser - reloads a specific user from backend disks or etcd. // LoadUser - reloads a specific user from backend disks or etcd.
func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, isSTS bool) error { func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, isSTS bool) error {
if objAPI == nil {
return errInvalidArgument
}
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
if objAPI == nil || sys.store == nil {
return errServerNotInitialized
}
if globalEtcdClient == nil { if globalEtcdClient == nil {
err := sys.store.loadUser(accessKey, isSTS, sys.iamUsersMap) err := sys.store.loadUser(accessKey, isSTS, sys.iamUsersMap)
if err != nil { if err != nil {
@ -351,14 +351,16 @@ func (sys *IAMSys) doIAMConfigMigration(objAPI ObjectLayer) error {
// Init - initializes config system from iam.json // Init - initializes config system from iam.json
func (sys *IAMSys) Init(objAPI ObjectLayer) error { func (sys *IAMSys) Init(objAPI ObjectLayer) error {
if objAPI == nil { if objAPI == nil {
return errInvalidArgument return errServerNotInitialized
} }
sys.Lock()
if globalEtcdClient == nil { if globalEtcdClient == nil {
sys.store = newIAMObjectStore() sys.store = newIAMObjectStore()
} else { } else {
sys.store = newIAMEtcdStore() sys.store = newIAMEtcdStore()
} }
sys.Unlock()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
@ -416,6 +418,13 @@ func (sys *IAMSys) DeletePolicy(policyName string) error {
return errInvalidArgument return errInvalidArgument
} }
sys.Lock()
defer sys.Unlock()
if sys.store == nil {
return errServerNotInitialized
}
err := sys.store.deletePolicyDoc(policyName) err := sys.store.deletePolicyDoc(policyName)
switch err.(type) { switch err.(type) {
case ObjectNotFound: case ObjectNotFound:
@ -423,9 +432,6 @@ func (sys *IAMSys) DeletePolicy(policyName string) error {
err = nil err = nil
} }
sys.Lock()
defer sys.Unlock()
delete(sys.iamPolicyDocsMap, policyName) delete(sys.iamPolicyDocsMap, policyName)
return err return err
} }
@ -481,12 +487,17 @@ func (sys *IAMSys) SetPolicy(policyName string, p iampolicy.Policy) error {
return errInvalidArgument return errInvalidArgument
} }
sys.Lock()
defer sys.Unlock()
if sys.store == nil {
return errServerNotInitialized
}
if err := sys.store.savePolicyDoc(policyName, p); err != nil { if err := sys.store.savePolicyDoc(policyName, p); err != nil {
return err return err
} }
sys.Lock()
defer sys.Unlock()
sys.iamPolicyDocsMap[policyName] = p sys.iamPolicyDocsMap[policyName] = p
return nil return nil
} }
@ -505,6 +516,10 @@ func (sys *IAMSys) DeleteUser(accessKey string) error {
return errIAMActionNotAllowed return errIAMActionNotAllowed
} }
if sys.store == nil {
return errServerNotInitialized
}
// It is ok to ignore deletion error on the mapped policy // It is ok to ignore deletion error on the mapped policy
sys.store.deleteMappedPolicy(accessKey, false, false) sys.store.deleteMappedPolicy(accessKey, false, false)
err := sys.store.deleteUserIdentity(accessKey, false) err := sys.store.deleteUserIdentity(accessKey, false)
@ -543,6 +558,10 @@ func (sys *IAMSys) SetTempUser(accessKey string, cred auth.Credentials, policyNa
return nil return nil
} }
if sys.store == nil {
return errServerNotInitialized
}
mp := newMappedPolicy(policyName) mp := newMappedPolicy(policyName)
if err := sys.store.saveMappedPolicy(accessKey, true, false, mp); err != nil { if err := sys.store.saveMappedPolicy(accessKey, true, false, mp); err != nil {
return err return err
@ -551,6 +570,10 @@ func (sys *IAMSys) SetTempUser(accessKey string, cred auth.Credentials, policyNa
sys.iamUserPolicyMap[accessKey] = mp sys.iamUserPolicyMap[accessKey] = mp
} }
if sys.store == nil {
return errServerNotInitialized
}
u := newUserIdentity(cred) u := newUserIdentity(cred)
if err := sys.store.saveUserIdentity(accessKey, true, u); err != nil { if err := sys.store.saveUserIdentity(accessKey, true, u); err != nil {
return err return err
@ -659,6 +682,11 @@ func (sys *IAMSys) SetUserStatus(accessKey string, status madmin.AccountStatus)
return config.EnableOff return config.EnableOff
}(), }(),
}) })
if sys.store == nil {
return errServerNotInitialized
}
if err := sys.store.saveUserIdentity(accessKey, false, uinfo); err != nil { if err := sys.store.saveUserIdentity(accessKey, false, uinfo); err != nil {
return err return err
} }
@ -687,6 +715,10 @@ func (sys *IAMSys) SetUser(accessKey string, uinfo madmin.UserInfo) error {
return errIAMActionNotAllowed return errIAMActionNotAllowed
} }
if sys.store == nil {
return errServerNotInitialized
}
if err := sys.store.saveUserIdentity(accessKey, false, u); err != nil { if err := sys.store.saveUserIdentity(accessKey, false, u); err != nil {
return err return err
} }
@ -718,6 +750,10 @@ func (sys *IAMSys) SetUserSecretKey(accessKey string, secretKey string) error {
return errNoSuchUser return errNoSuchUser
} }
if sys.store == nil {
return errServerNotInitialized
}
cred.SecretKey = secretKey cred.SecretKey = secretKey
u := newUserIdentity(cred) u := newUserIdentity(cred)
if err := sys.store.saveUserIdentity(accessKey, false, u); err != nil { if err := sys.store.saveUserIdentity(accessKey, false, u); err != nil {
@ -775,6 +811,10 @@ func (sys *IAMSys) AddUsersToGroup(group string, members []string) error {
gi.Members = uniqMembers gi.Members = uniqMembers
} }
if sys.store == nil {
return errServerNotInitialized
}
if err := sys.store.saveGroupInfo(group, gi); err != nil { if err := sys.store.saveGroupInfo(group, gi); err != nil {
return err return err
} }
@ -832,6 +872,10 @@ func (sys *IAMSys) RemoveUsersFromGroup(group string, members []string) error {
return errGroupNotEmpty return errGroupNotEmpty
} }
if sys.store == nil {
return errServerNotInitialized
}
if len(members) == 0 { if len(members) == 0 {
// len(gi.Members) == 0 here. // len(gi.Members) == 0 here.
@ -887,6 +931,10 @@ func (sys *IAMSys) SetGroupStatus(group string, enabled bool) error {
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
if sys.store == nil {
return errServerNotInitialized
}
if sys.usersSysType != MinIOUsersSysType { if sys.usersSysType != MinIOUsersSysType {
return errIAMActionNotAllowed return errIAMActionNotAllowed
} }
@ -984,6 +1032,10 @@ func (sys *IAMSys) PolicyDBSet(name, policy string, isGroup bool) error {
// policyDBSet - sets a policy for user in the policy db. Assumes that // policyDBSet - sets a policy for user in the policy db. Assumes that
// caller has sys.Lock(). // caller has sys.Lock().
func (sys *IAMSys) policyDBSet(name, policy string, isSTS, isGroup bool) error { func (sys *IAMSys) policyDBSet(name, policy string, isSTS, isGroup bool) error {
if sys.store == nil {
return errServerNotInitialized
}
if name == "" || policy == "" { if name == "" || policy == "" {
return errInvalidArgument return errInvalidArgument
} }

View File

@ -130,31 +130,40 @@ func GetCurrentReleaseTime() (releaseTime time.Time, err error) {
// "/.dockerenv": "file", // "/.dockerenv": "file",
// //
func IsDocker() bool { func IsDocker() bool {
_, err := os.Stat("/.dockerenv") if env.Get("SIMPLE_CI", "") == "" {
if os.IsNotExist(err) { _, err := os.Stat("/.dockerenv")
return false if os.IsNotExist(err) {
return false
}
// Log error, as we will not propagate it to caller
logger.LogIf(context.Background(), err)
return err == nil
} }
return false
// Log error, as we will not propagate it to caller
logger.LogIf(context.Background(), err)
return err == nil
} }
// IsDCOS returns true if minio is running in DCOS. // IsDCOS returns true if minio is running in DCOS.
func IsDCOS() bool { func IsDCOS() bool {
// http://mesos.apache.org/documentation/latest/docker-containerizer/ if env.Get("SIMPLE_CI", "") == "" {
// Mesos docker containerizer sets this value // http://mesos.apache.org/documentation/latest/docker-containerizer/
return env.Get("MESOS_CONTAINER_NAME", "") != "" // Mesos docker containerizer sets this value
return env.Get("MESOS_CONTAINER_NAME", "") != ""
}
return false
} }
// IsKubernetes returns true if minio is running in kubernetes. // IsKubernetes returns true if minio is running in kubernetes.
func IsKubernetes() bool { func IsKubernetes() bool {
// Kubernetes env used to validate if we are if env.Get("SIMPLE_CI", "") == "" {
// indeed running inside a kubernetes pod // Kubernetes env used to validate if we are
// is KUBERNETES_SERVICE_HOST but in future // indeed running inside a kubernetes pod
// we might need to enhance this. // is KUBERNETES_SERVICE_HOST but in future
return env.Get("KUBERNETES_SERVICE_HOST", "") != "" // we might need to enhance this.
return env.Get("KUBERNETES_SERVICE_HOST", "") != ""
}
return false
} }
// IsBOSH returns true if minio is deployed from a bosh package // IsBOSH returns true if minio is deployed from a bosh package