Add support of conf file to pass arguments and options (#18592)

This commit is contained in:
Anis Eleuch
2023-12-07 01:33:56 -08:00
committed by GitHub
parent 9cdf490bc5
commit 2e23e61a45
24 changed files with 742 additions and 376 deletions

View File

@@ -803,11 +803,23 @@ func (p PoolEndpointList) UpdateIsLocal() error {
return nil
}
func isEmptyLayout(poolsLayout ...poolDisksLayout) bool {
return len(poolsLayout) == 0 || len(poolsLayout[0].layout) == 0 || len(poolsLayout[0].layout[0]) == 0 || len(poolsLayout[0].layout[0][0]) == 0
}
func isSingleDriveLayout(poolsLayout ...poolDisksLayout) bool {
return len(poolsLayout) == 1 && len(poolsLayout[0].layout) == 1 && len(poolsLayout[0].layout[0]) == 1
}
// CreatePoolEndpoints creates a list of endpoints per pool, resolves their relevant hostnames and
// discovers those are local or remote.
func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints, SetupType, error) {
func CreatePoolEndpoints(serverAddr string, poolsLayout ...poolDisksLayout) ([]Endpoints, SetupType, error) {
var setupType SetupType
if isEmptyLayout(poolsLayout...) {
return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg("invalid number of endpoints")
}
// Check whether serverAddr is valid for this host.
if err := CheckLocalServerAddr(serverAddr); err != nil {
return nil, setupType, err
@@ -815,11 +827,11 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints
_, serverAddrPort := mustSplitHostPort(serverAddr)
poolEndpoints := make(PoolEndpointList, len(poolArgs))
poolEndpoints := make(PoolEndpointList, len(poolsLayout))
// For single arg, return single drive EC setup.
if len(poolArgs) == 1 && len(poolArgs[0]) == 1 && len(poolArgs[0][0]) == 1 && len(poolArgs[0][0][0]) == 1 {
endpoint, err := NewEndpoint(poolArgs[0][0][0])
if isSingleDriveLayout(poolsLayout...) {
endpoint, err := NewEndpoint(poolsLayout[0].layout[0][0])
if err != nil {
return nil, setupType, err
}
@@ -847,11 +859,11 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints
return poolEndpoints, setupType, nil
}
for poolIdx, args := range poolArgs {
for poolIdx, pool := range poolsLayout {
var endpoints Endpoints
for setIdx, iargs := range args {
for setIdx, setLayout := range pool.layout {
// Convert args to endpoints
eps, err := NewEndpoints(iargs...)
eps, err := NewEndpoints(setLayout...)
if err != nil {
return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
@@ -1003,6 +1015,8 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints
poolEndpoints[i] = endpoints
}
// TODO: ensure that each pool has at least two nodes in a distributed setup
publicIPs := env.Get(config.EnvPublicIPs, "")
if len(publicIPs) == 0 {
updateDomainIPs(uniqueArgs)
@@ -1021,199 +1035,6 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints
return poolEndpoints, setupType, nil
}
// CreateEndpoints - validates and creates new endpoints for given args.
func CreateEndpoints(serverAddr string, args ...[]string) (Endpoints, SetupType, error) {
var endpoints Endpoints
var setupType SetupType
var err error
// Check whether serverAddr is valid for this host.
if err = CheckLocalServerAddr(serverAddr); err != nil {
return endpoints, setupType, err
}
_, serverAddrPort := mustSplitHostPort(serverAddr)
// For single arg, return single drive setup.
if len(args) == 1 && len(args[0]) == 1 {
var endpoint Endpoint
endpoint, err = NewEndpoint(args[0][0])
if err != nil {
return endpoints, setupType, err
}
if err := endpoint.UpdateIsLocal(); err != nil {
return endpoints, setupType, err
}
if endpoint.Type() != PathEndpointType {
return endpoints, setupType, config.ErrInvalidEndpoint(nil).Msg("use path style endpoint for single node setup")
}
endpoint.SetPoolIndex(0)
endpoint.SetSetIndex(0)
endpoint.SetDiskIndex(0)
endpoints = append(endpoints, endpoint)
setupType = ErasureSDSetupType
// Check for cross device mounts if any.
if err = checkCrossDeviceMounts(endpoints); err != nil {
return endpoints, setupType, config.ErrInvalidEndpoint(nil).Msg(err.Error())
}
return endpoints, setupType, nil
}
for setIdx, iargs := range args {
// Convert args to endpoints
eps, err := NewEndpoints(iargs...)
if err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
// Check for cross device mounts if any.
if err = checkCrossDeviceMounts(eps); err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
for diskIdx := range eps {
eps[diskIdx].SetSetIndex(setIdx)
eps[diskIdx].SetDiskIndex(diskIdx)
}
endpoints = append(endpoints, eps...)
}
if len(endpoints) == 0 {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg("invalid number of endpoints")
}
// Return Erasure setup when all endpoints are path style.
if endpoints[0].Type() == PathEndpointType {
setupType = ErasureSetupType
return endpoints, setupType, nil
}
if err = endpoints.UpdateIsLocal(); err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
// Here all endpoints are URL style.
endpointPathSet := set.NewStringSet()
localEndpointCount := 0
localServerHostSet := set.NewStringSet()
localPortSet := set.NewStringSet()
for _, endpoint := range endpoints {
endpointPathSet.Add(endpoint.Path)
if endpoint.IsLocal {
localServerHostSet.Add(endpoint.Hostname())
var port string
_, port, err = net.SplitHostPort(endpoint.Host)
if err != nil {
port = serverAddrPort
}
localPortSet.Add(port)
localEndpointCount++
}
}
orchestrated := IsKubernetes() || IsDocker()
reverseProxy := (env.Get("_MINIO_REVERSE_PROXY", "") != "") && ((env.Get("MINIO_CI_CD", "") != "") || (env.Get("CI", "") != ""))
// If not orchestrated
if !orchestrated &&
// and not setup in reverse proxy
!reverseProxy {
// Check whether same path is not used in endpoints of a host on different port.
// Only verify this on baremetal setups, DNS is not available in orchestrated
// environments so we can't do much here.
pathIPMap := make(map[string]set.StringSet)
hostIPCache := make(map[string]set.StringSet)
for _, endpoint := range endpoints {
host := endpoint.Hostname()
hostIPSet, ok := hostIPCache[host]
if !ok {
hostIPSet, err = getHostIP(host)
if err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("host '%s' cannot resolve: %s", host, err))
}
hostIPCache[host] = hostIPSet
}
if IPSet, ok := pathIPMap[endpoint.Path]; ok {
if !IPSet.Intersection(hostIPSet).IsEmpty() {
return endpoints, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("same path '%s' can not be served by different port on same address", endpoint.Path))
}
pathIPMap[endpoint.Path] = IPSet.Union(hostIPSet)
} else {
pathIPMap[endpoint.Path] = hostIPSet
}
}
}
// Check whether same path is used for more than 1 local endpoints.
{
localPathSet := set.CreateStringSet()
for _, endpoint := range endpoints {
if !endpoint.IsLocal {
continue
}
if localPathSet.Contains(endpoint.Path) {
return endpoints, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("path '%s' cannot be served by different address on same server", endpoint.Path))
}
localPathSet.Add(endpoint.Path)
}
}
// Add missing port in all endpoints.
for i := range endpoints {
_, port, err := net.SplitHostPort(endpoints[i].Host)
if err != nil {
endpoints[i].Host = net.JoinHostPort(endpoints[i].Host, serverAddrPort)
} else if endpoints[i].IsLocal && serverAddrPort != port {
// If endpoint is local, but port is different than serverAddrPort, then make it as remote.
endpoints[i].IsLocal = false
}
}
// All endpoints are pointing to local host
if len(endpoints) == localEndpointCount {
// If all endpoints have same port number, Just treat it as local erasure setup
// using URL style endpoints.
if len(localPortSet) == 1 {
if len(localServerHostSet) > 1 {
return endpoints, setupType,
config.ErrInvalidErasureEndpoints(nil).Msg("all local endpoints should not have different hostnames/ips")
}
return endpoints, ErasureSetupType, nil
}
// Even though all endpoints are local, but those endpoints use different ports.
// This means it is DistErasure setup.
}
uniqueArgs := set.NewStringSet()
for _, endpoint := range endpoints {
uniqueArgs.Add(endpoint.Host)
}
// Error out if we have less than 2 unique servers.
if len(uniqueArgs.ToSlice()) < 2 && setupType == DistErasureSetupType {
err := fmt.Errorf("Unsupported number of endpoints (%s), minimum number of servers cannot be less than 2 in distributed setup", endpoints)
return endpoints, setupType, err
}
publicIPs := env.Get(config.EnvPublicIPs, "")
if len(publicIPs) == 0 {
updateDomainIPs(uniqueArgs)
}
setupType = DistErasureSetupType
return endpoints, setupType, nil
}
// GetLocalPeer - returns local peer value, returns globalMinioAddr
// for FS and Erasure mode. In case of distributed server return
// the first element from the set of peers which indicate that