minio/cmd/server-main.go
Harshavardhana 36e12a6038 Assume local endpoints appropriately in k8s deployments (#8375)
On Kubernetes/Docker setups DNS resolves inappropriately
sometimes where there are situations same endpoints with
multiple disks come online indicating either one of them
is local and some of them are not local. This situation
can never happen and its only a possibility in orchestrated
deployments with dynamic DNS. Following code ensures that we
treat if one of the endpoint says its local for a given host
it is true for all endpoints for the same host. Following code
ensures that this assumption is true and it works in all
scenarios and it is safe to assume for a given host.

This PR also adds validation such that we do not crash the
server if there are bugs in the endpoints list in dsync
initialization.

Thanks to Daniel Valdivia <hola@danielvaldivia.com> for
reproducing this, this fix is needed as part of the
https://github.com/minio/m3 project.
2019-10-10 10:14:17 +05:30

424 lines
14 KiB
Go

/*
* MinIO Cloud Storage, (C) 2015-2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"encoding/gob"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"github.com/minio/cli"
"github.com/minio/dsync/v2"
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/certs"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/env"
)
func init() {
logger.Init(GOPATH, GOROOT)
logger.RegisterError(config.FmtError)
gob.Register(VerifyFileError(""))
gob.Register(DeleteFileError(""))
}
// ServerFlags - server command specific flags
var ServerFlags = []cli.Flag{
cli.StringFlag{
Name: "address",
Value: ":" + globalMinioDefaultPort,
Usage: "bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname",
},
}
var serverCmd = cli.Command{
Name: "server",
Usage: "start object storage server",
Flags: append(ServerFlags, GlobalFlags...),
Action: serverMain,
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR1 [DIR2..]
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64}
DIR:
DIR points to a directory on a filesystem. When you want to combine
multiple drives into a single large system, pass one directory per
filesystem separated by space. You may also use a '...' convention
to abbreviate the directory arguments. Remote directories in a
distributed setup are encoded as HTTP(s) URIs.
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
ENVIRONMENT VARIABLES:
ACCESS:
MINIO_ACCESS_KEY: Custom username or access key of minimum 3 characters in length.
MINIO_SECRET_KEY: Custom password or secret key of minimum 8 characters in length.
BROWSER:
MINIO_BROWSER: To disable web browser access, set this value to "off".
CACHE:
MINIO_CACHE_DRIVES: List of mounted drives or directories delimited by ";".
MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by ";".
MINIO_CACHE_EXPIRY: Cache expiry duration in days.
MINIO_CACHE_MAXUSE: Maximum permitted usage of the cache in percentage (0-100).
DOMAIN:
MINIO_DOMAIN: To enable virtual-host-style requests, set this value to MinIO host domain name.
WORM:
MINIO_WORM: To turn on Write-Once-Read-Many in server, set this value to "on".
BUCKET-DNS:
MINIO_DOMAIN: To enable bucket DNS requests, set this value to MinIO host domain name.
MINIO_PUBLIC_IPS: To enable bucket DNS requests, set this value to list of MinIO host public IP(s) delimited by ",".
MINIO_ETCD_ENDPOINTS: To enable bucket DNS requests, set this value to list of etcd endpoints delimited by ",".
KMS:
MINIO_SSE_VAULT_ENDPOINT: To enable Vault as KMS,set this value to Vault endpoint.
MINIO_SSE_VAULT_APPROLE_ID: To enable Vault as KMS,set this value to Vault AppRole ID.
MINIO_SSE_VAULT_APPROLE_SECRET: To enable Vault as KMS,set this value to Vault AppRole Secret ID.
MINIO_SSE_VAULT_KEY_NAME: To enable Vault as KMS,set this value to Vault encryption key-ring name.
EXAMPLES:
1. Start minio server on "/home/shared" directory.
{{.Prompt}} {{.HelpName}} /home/shared
2. Start minio server bound to a specific ADDRESS:PORT.
{{.Prompt}} {{.HelpName}} --address 192.168.1.101:9000 /home/shared
3. Start minio server and enable virtual-host-style requests.
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_DOMAIN{{.AssignmentOperator}}mydomain.com
{{.Prompt}} {{.HelpName}} --address mydomain.com:9000 /mnt/export
4. Start erasure coded minio server on a node with 64 drives.
{{.Prompt}} {{.HelpName}} /mnt/export{1...64}
5. Start distributed minio server on an 32 node setup with 32 drives each. Run following command on all the 32 nodes.
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}minio
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}miniostorage
{{.Prompt}} {{.HelpName}} http://node{1...32}.example.com/mnt/export/{1...32}
6. Start minio server with KMS enabled.
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SSE_VAULT_APPROLE_ID{{.AssignmentOperator}}9b56cc08-8258-45d5-24a3-679876769126
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SSE_VAULT_APPROLE_SECRET{{.AssignmentOperator}}4e30c52f-13e4-a6f5-0763-d50e8cb4321f
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SSE_VAULT_ENDPOINT{{.AssignmentOperator}}https://vault-endpoint-ip:8200
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SSE_VAULT_KEY_NAME{{.AssignmentOperator}}my-minio-key
{{.Prompt}} {{.HelpName}} /home/shared
`,
}
// Checks if endpoints are either available through environment
// or command line, returns false if both fails.
func endpointsPresent(ctx *cli.Context) bool {
_, ok := env.Lookup(config.EnvEndpoints)
if !ok {
ok = ctx.Args().Present()
}
return ok
}
func serverHandleCmdArgs(ctx *cli.Context) {
// Handle common command args.
handleCommonCmdArgs(ctx)
logger.FatalIf(CheckLocalServerAddr(globalCLIContext.Addr), "Unable to validate passed arguments")
var setupType SetupType
var err error
if len(ctx.Args()) > serverCommandLineArgsMax {
uErr := config.ErrInvalidErasureEndpoints(nil).Msg(fmt.Sprintf("Invalid total number of endpoints (%d) passed, supported upto 32 unique arguments",
len(ctx.Args())))
logger.FatalIf(uErr, "Unable to validate passed endpoints")
}
endpoints := strings.Fields(env.Get(config.EnvEndpoints, ""))
if len(endpoints) > 0 {
globalMinioAddr, globalEndpoints, setupType, globalXLSetCount, globalXLSetDriveCount, err = createServerEndpoints(globalCLIContext.Addr, endpoints...)
} else {
globalMinioAddr, globalEndpoints, setupType, globalXLSetCount, globalXLSetDriveCount, err = createServerEndpoints(globalCLIContext.Addr, ctx.Args()...)
}
logger.FatalIf(err, "Invalid command line arguments")
logger.LogIf(context.Background(), checkEndpointsSubOptimal(ctx, setupType, globalEndpoints))
globalMinioHost, globalMinioPort = mustSplitHostPort(globalMinioAddr)
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
// to IPv6 address ie minio will start listening on IPv6 address whereas another
// (non-)minio process is listening on IPv4 of given port.
// To avoid this error sutiation we check for port availability.
logger.FatalIf(checkPortAvailability(globalMinioHost, globalMinioPort), "Unable to start the server")
globalIsXL = (setupType == XLSetupType)
globalIsDistXL = (setupType == DistXLSetupType)
if globalIsDistXL {
globalIsXL = true
}
}
func serverHandleEnvVars() {
// Handle common environment variables.
handleCommonEnvVars()
if serverRegion := env.Get("MINIO_REGION", ""); serverRegion != "" {
// region Envs are set globally.
globalIsEnvRegion = true
globalServerRegion = serverRegion
}
}
// serverMain handler called for 'minio server' command.
func serverMain(ctx *cli.Context) {
if ctx.Args().First() == "help" || !endpointsPresent(ctx) {
cli.ShowCommandHelpAndExit(ctx, "server", 1)
}
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM)
// Disable logging until server initialization is complete, any
// error during initialization will be shown as a fatal message
logger.Disable = true
// Handle all server command args.
serverHandleCmdArgs(ctx)
// Check and load TLS certificates.
var err error
globalPublicCerts, globalTLSCerts, globalIsSSL, err = getTLSConfig()
logger.FatalIf(err, "Unable to load the TLS configuration")
// Check and load Root CAs.
globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get())
logger.FatalIf(err, "Failed to read root CAs (%v)", err)
// Handle all server environment vars.
serverHandleEnvVars()
// Is distributed setup, error out if no certificates are found for HTTPS endpoints.
if globalIsDistXL {
if globalEndpoints.IsHTTPS() && !globalIsSSL {
logger.Fatal(config.ErrNoCertsAndHTTPSEndpoints(nil), "Unable to start the server")
}
if !globalEndpoints.IsHTTPS() && globalIsSSL {
logger.Fatal(config.ErrCertsAndHTTPEndpoints(nil), "Unable to start the server")
}
}
if !globalCLIContext.Quiet {
// Check for new updates from dl.min.io.
checkUpdate(getMinioMode())
}
if globalIsDiskCacheEnabled {
logger.StartupMessage(color.Red(color.Bold("Disk caching is allowed only for gateway deployments")))
}
// FIXME: This code should be removed in future releases and we should have mandatory
// check for ENVs credentials under distributed setup. Until all users migrate we
// are intentionally providing backward compatibility.
{
// Check for backward compatibility and newer style.
if !globalIsEnvCreds && globalIsDistXL {
// Try to load old config file if any, for backward compatibility.
var cfg = &serverConfig{}
if _, err = Load(getConfigFile(), cfg); err == nil {
globalActiveCred = cfg.Credential
}
if os.IsNotExist(err) {
if _, err = Load(getConfigFile()+".deprecated", cfg); err == nil {
globalActiveCred = cfg.Credential
}
}
if globalActiveCred.IsValid() {
// Credential is valid don't throw an error instead print a message regarding deprecation of 'config.json'
// based model and proceed to use it for now in distributed setup.
logger.Info(`Supplying credentials from your 'config.json' is **DEPRECATED**, Access key and Secret key in distributed server mode is expected to be specified with environment variables MINIO_ACCESS_KEY and MINIO_SECRET_KEY. This approach will become mandatory in future releases, please migrate to this approach soon.`)
} else {
// Credential is not available anywhere by both means, we cannot start distributed setup anymore, fail eagerly.
logger.Fatal(config.ErrEnvCredentialsMissingDistributed(nil),
"Unable to initialize the server in distributed mode")
}
}
}
// Set system resources to maximum.
logger.LogIf(context.Background(), setMaxResources())
// Set nodes for dsync for distributed setup.
if globalIsDistXL {
clnts, myNode, err := newDsyncNodes(globalEndpoints)
if err != nil {
logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints)
}
globalDsync, err = dsync.New(clnts, myNode)
if err != nil {
logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints)
}
}
// Initialize name space lock.
initNSLock(globalIsDistXL)
if globalIsXL {
// Init global heal state
globalAllHealState = initHealState()
globalSweepHealState = initHealState()
}
// Initialize globalConsoleSys system
globalConsoleSys = NewConsoleLogger(context.Background(), globalEndpoints)
// Configure server.
var handler http.Handler
handler, err = configureServerHandler(globalEndpoints)
if err != nil {
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
}
var getCert certs.GetCertificateFunc
if globalTLSCerts != nil {
getCert = globalTLSCerts.GetCertificate
}
globalHTTPServer = xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{handler}, getCert)
globalHTTPServer.UpdateBytesReadFunc = globalConnStats.incInputBytes
globalHTTPServer.UpdateBytesWrittenFunc = globalConnStats.incOutputBytes
go func() {
globalHTTPServerErrorCh <- globalHTTPServer.Start()
}()
newObject, err := newObjectLayer(globalEndpoints)
logger.SetDeploymentID(globalDeploymentID)
if err != nil {
// Stop watching for any certificate changes.
globalTLSCerts.Stop()
globalHTTPServer.Shutdown()
logger.FatalIf(err, "Unable to initialize backend")
}
// Populate existing buckets to the etcd backend
if globalDNSConfig != nil {
initFederatorBackend(newObject)
}
// Re-enable logging
logger.Disable = false
// Create a new config system.
globalConfigSys = NewConfigSys()
// Initialize config system.
if err = globalConfigSys.Init(newObject); err != nil {
logger.Fatal(err, "Unable to initialize config system")
}
// Create new IAM system.
globalIAMSys = NewIAMSys()
if err = globalIAMSys.Init(newObject); err != nil {
logger.Fatal(err, "Unable to initialize IAM system")
}
buckets, err := newObject.ListBuckets(context.Background())
if err != nil {
logger.Fatal(err, "Unable to list buckets on your backend")
}
// Create new policy system.
globalPolicySys = NewPolicySys()
// Initialize policy system.
if err = globalPolicySys.Init(buckets, newObject); err != nil {
logger.Fatal(err, "Unable to initialize policy system")
}
// Create new lifecycle system.
globalLifecycleSys = NewLifecycleSys()
// Initialize lifecycle system.
if err = globalLifecycleSys.Init(buckets, newObject); err != nil {
logger.Fatal(err, "Unable to initialize lifecycle system")
}
// Create new notification system.
globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints)
// Initialize notification system.
if err = globalNotificationSys.Init(buckets, newObject); err != nil {
logger.Fatal(err, "Unable to initialize notification system")
}
// Verify if object layer supports
// - encryption
// - compression
verifyObjectLayerFeatures("server", newObject)
initDailyLifecycle()
if globalIsXL {
initBackgroundHealing()
initDailyHeal()
initDailySweeper()
}
globalObjLayerMutex.Lock()
globalObjectAPI = newObject
globalObjLayerMutex.Unlock()
// Prints the formatted startup message once object layer is initialized.
printStartupMessage(getAPIEndpoints())
// Set uptime time after object layer has initialized.
globalBootTime = UTCNow()
handleSignals()
}
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
func newObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err error) {
// For FS only, directly use the disk.
isFS := len(endpoints) == 1
if isFS {
// Initialize new FS object layer.
return NewFSObjectLayer(endpoints[0].Path)
}
format, err := waitForFormatXL(context.Background(), endpoints[0].IsLocal, endpoints, globalXLSetCount, globalXLSetDriveCount)
if err != nil {
return nil, err
}
return newXLSets(endpoints, format, len(format.XL.Sets), len(format.XL.Sets[0]))
}