Bring etcd support for bucket DNS federation

- Supports centralized `config.json`
- Supports centralized `bucket` service records
  for client lookups
- implement a new proxy forwarder
This commit is contained in:
Harshavardhana
2018-02-02 18:18:52 -08:00
committed by kannappanr
parent 7872c192ec
commit 853ea371ce
144 changed files with 77684 additions and 81 deletions

View File

@@ -818,8 +818,8 @@ func (a adminAPIHandlers) UpdateCredentialsHandler(w http.ResponseWriter,
// Update local credentials in memory.
globalServerConfig.SetCredential(creds)
if err = globalServerConfig.Save(); err != nil {
writeErrorResponseJSON(w, ErrInternalError, r.URL)
if err = globalServerConfig.Save(getConfigFile()); err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
return
}

View File

@@ -17,10 +17,12 @@
package cmd
import (
"context"
"encoding/xml"
"fmt"
"net/http"
"github.com/coreos/etcd/client"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash"
@@ -864,6 +866,29 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
apiErr = ErrAdminInvalidAccessKey
case auth.ErrInvalidSecretKeyLength:
apiErr = ErrAdminInvalidSecretKey
// SSE errors
case errInsecureSSERequest:
apiErr = ErrInsecureSSECustomerRequest
case errInvalidSSEAlgorithm:
apiErr = ErrInvalidSSECustomerAlgorithm
case errInvalidSSEKey:
apiErr = ErrInvalidSSECustomerKey
case errMissingSSEKey:
apiErr = ErrMissingSSECustomerKey
case errMissingSSEKeyMD5:
apiErr = ErrMissingSSECustomerKeyMD5
case errSSEKeyMD5Mismatch:
apiErr = ErrSSECustomerKeyMD5Mismatch
case errObjectTampered:
apiErr = ErrObjectTampered
case errEncryptedObject:
apiErr = ErrSSEEncryptedObject
case errInvalidSSEParameters:
apiErr = ErrInvalidSSECustomerParameters
case errSSEKeyMismatch:
apiErr = ErrAccessDenied // no access without correct key
case context.Canceled, context.DeadlineExceeded:
apiErr = ErrOperationTimedOut
}
if apiErr != ErrNone {
@@ -871,27 +896,12 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
return apiErr
}
switch err { // SSE errors
case errInsecureSSERequest:
return ErrInsecureSSECustomerRequest
case errInvalidSSEAlgorithm:
return ErrInvalidSSECustomerAlgorithm
case errInvalidSSEKey:
return ErrInvalidSSECustomerKey
case errMissingSSEKey:
return ErrMissingSSECustomerKey
case errMissingSSEKeyMD5:
return ErrMissingSSECustomerKeyMD5
case errSSEKeyMD5Mismatch:
return ErrSSECustomerKeyMD5Mismatch
case errObjectTampered:
return ErrObjectTampered
case errEncryptedObject:
return ErrSSEEncryptedObject
case errInvalidSSEParameters:
return ErrInvalidSSECustomerParameters
case errSSEKeyMismatch:
return ErrAccessDenied // no access without correct key
// etcd specific errors, a key is always a bucket for us return
// ErrNoSuchBucket in such a case.
if e, ok := err.(*client.Error); ok {
if e.Code == client.ErrorCodeKeyNotFound {
return ErrNoSuchBucket
}
}
switch err.(type) {

View File

@@ -28,6 +28,7 @@ import (
"strings"
"sync"
"github.com/coreos/etcd/client"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event"
@@ -158,11 +159,28 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
return
}
// Invoke the list buckets.
bucketsInfo, err := listBuckets(ctx)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
// If etcd, dns federation configured list buckets from etcd.
var bucketsInfo []BucketInfo
if globalDNSConfig != nil {
dnsBuckets, err := globalDNSConfig.List()
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
for _, dnsRecord := range dnsBuckets {
bucketsInfo = append(bucketsInfo, BucketInfo{
Name: dnsRecord.Key,
Created: dnsRecord.CreationDate,
})
}
} else {
// Invoke the list buckets.
var err error
bucketsInfo, err = listBuckets(ctx)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
// Generate response.
@@ -353,12 +371,33 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
return
}
bucketLock := globalNSMutex.NewNSLock(bucket, "")
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
if globalDNSConfig != nil {
if _, err := globalDNSConfig.Get(bucket); err != nil {
if client.IsKeyNotFound(err) {
// Proceed to creating a bucket.
if err = objectAPI.MakeBucketWithLocation(ctx, bucket, location); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
if err = globalDNSConfig.Put(bucket); err != nil {
objectAPI.DeleteBucket(ctx, bucket)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Make sure to add Location information here only for bucket
w.Header().Set("Location", getObjectLocation(r, globalDomainName, bucket, ""))
writeSuccessResponseHeadersOnly(w)
return
}
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
writeErrorResponse(w, ErrBucketAlreadyOwnedByYou, r.URL)
return
}
defer bucketLock.Unlock()
// Proceed to creating a bucket.
err := objectAPI.MakeBucketWithLocation(ctx, bucket, location)
@@ -660,6 +699,15 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
logger.LogIf(ctx, nerr.Err)
}
if globalDNSConfig != nil {
if err := globalDNSConfig.Delete(bucket); err != nil {
// Deleting DNS entry failed, attempt to create the bucket again.
objectAPI.MakeBucketWithLocation(ctx, bucket, "")
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
// Write success response.
writeSuccessNoContent(w)
}

View File

@@ -18,15 +18,19 @@ package cmd
import (
"errors"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
etcdc "github.com/coreos/etcd/client"
"github.com/minio/cli"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/dns"
)
// Check for updates and print a notification message
@@ -42,6 +46,18 @@ func checkUpdate(mode string) {
}
func initConfig() {
if globalEtcdClient != nil {
if err := loadConfig(); err != nil {
if etcdc.IsKeyNotFound(err) {
logger.FatalIf(newConfig(), "Unable to initialize minio config for the first time.")
logger.Info("Created minio configuration file successfully at", globalEtcdClient.Endpoints())
} else {
logger.FatalIf(err, "Unable to load config version: '%s'.", serverConfigVersion)
}
}
return
}
// Config file does not exist, we create it fresh and return upon success.
if isFile(getConfigFile()) {
logger.FatalIf(migrateConfig(), "Config migration failed")
@@ -125,6 +141,31 @@ func handleCommonEnvVars() {
globalDomainName, globalIsEnvDomainName = os.LookupEnv("MINIO_DOMAIN")
etcdEndpointsEnv, ok := os.LookupEnv("MINIO_ETCD_ENDPOINTS")
if ok {
etcdEndpoints := strings.Split(etcdEndpointsEnv, ",")
var err error
globalEtcdClient, err = etcdc.New(etcdc.Config{
Endpoints: etcdEndpoints,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
},
})
logger.FatalIf(err, "Unable to initialize etcd with %s", etcdEndpoints)
}
globalDomainIP = os.Getenv("MINIO_DOMAIN_IP")
if globalDomainName != "" && globalDomainIP != "" && globalEtcdClient != nil {
var err error
globalDNSConfig, err = dns.NewCoreDNS(globalDomainName, globalDomainIP, globalMinioPort, globalEtcdClient)
logger.FatalIf(err, "Unable to initialize DNS config for %s.", globalDomainName)
}
if drives := os.Getenv("MINIO_CACHE_DRIVES"); drives != "" {
driveList, err := parseCacheDrives(strings.Split(drives, cacheEnvDelimiter))
if err != nil {

View File

@@ -128,9 +128,9 @@ func (s *serverConfig) GetCacheConfig() CacheConfig {
}
// Save config.
func (s *serverConfig) Save() error {
func (s *serverConfig) Save(configFile string) error {
// Save config file.
return quick.Save(getConfigFile(), s)
return quick.Save(configFile, s)
}
// Returns the string describing a difference with the given
@@ -230,7 +230,10 @@ func newServerConfig() *serverConfig {
// found, otherwise use default parameters
func newConfig() error {
// Initialize server config.
srvCfg := newServerConfig()
srvCfg, err := newQuickConfig(newServerConfig())
if err != nil {
return err
}
// If env is set override the credentials from config file.
if globalIsEnvCreds {
@@ -269,7 +272,29 @@ func newConfig() error {
globalServerConfigMu.Unlock()
// Save config into file.
return globalServerConfig.Save()
return globalServerConfig.Save(getConfigFile())
}
// newQuickConfig - initialize a new server config, with an allocated
// quick.Config interface.
func newQuickConfig(srvCfg *serverConfig) (*serverConfig, error) {
if globalEtcdClient == nil {
qcfg, err := quick.NewLocalConfig(srvCfg)
if err != nil {
return nil, err
}
srvCfg.Config = qcfg
return srvCfg, nil
}
qcfg, err := quick.NewEtcdConfig(srvCfg, globalEtcdClient)
if err != nil {
return nil, err
}
srvCfg.Config = qcfg
return srvCfg, nil
}
// getValidConfig - returns valid server configuration
@@ -279,7 +304,14 @@ func getValidConfig() (*serverConfig, error) {
Browser: true,
}
if _, err := quick.Load(getConfigFile(), srvCfg); err != nil {
var err error
srvCfg, err = newQuickConfig(srvCfg)
if err != nil {
return nil, err
}
configFile := getConfigFile()
if err = srvCfg.Load(configFile); err != nil {
return nil, err
}

View File

@@ -50,7 +50,7 @@ func TestServerConfig(t *testing.T) {
}
// Attempt to save.
if err := globalServerConfig.Save(); err != nil {
if err := globalServerConfig.Save(getConfigFile()); err != nil {
t.Fatalf("Unable to save updated config file %s", err)
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/event/target"
"github.com/minio/minio/pkg/quick"
)
/////////////////// Config V1 ///////////////////
@@ -585,6 +586,8 @@ type serverConfigV22 struct {
// IMPORTANT NOTE: When updating this struct make sure that
// serverConfig.ConfigDiff() is updated as necessary.
type serverConfigV23 struct {
quick.Config `json:"-"` // ignore interfaces
Version string `json:"version"`
// S3 API configuration.

View File

@@ -19,13 +19,17 @@ package cmd
import (
"bufio"
"context"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/coreos/etcd/client"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/sys"
"github.com/rs/cors"
"golang.org/x/time/rate"
@@ -611,6 +615,64 @@ func (h pathValidityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.handler.ServeHTTP(w, r)
}
// To forward the path style requests on a bucket to the right
// configured server, bucket to IP configuration is obtained
// from centralized etcd configuration service.
type bucketForwardingHandler struct {
fwd *handlers.Forwarder
handler http.Handler
}
func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if globalDNSConfig == nil || globalDomainName == "" {
f.handler.ServeHTTP(w, r)
return
}
bucket, object := urlPath2BucketObjectName(r.URL.Path)
if r.Method == http.MethodPut && bucket != "" && object == "" {
f.handler.ServeHTTP(w, r)
return
}
sr, err := globalDNSConfig.Get(bucket)
if err != nil {
if client.IsKeyNotFound(err) {
writeErrorResponse(w, ErrNoSuchBucket, r.URL)
} else {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
}
return
}
if sr.Host != globalDomainIP {
backendURL := fmt.Sprintf("http://%s:%d", sr.Host, sr.Port)
if globalIsSSL {
backendURL = fmt.Sprintf("https://%s:%d", sr.Host, sr.Port)
}
r.URL, err = url.Parse(backendURL)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
f.fwd.ServeHTTP(w, r)
return
}
f.handler.ServeHTTP(w, r)
}
// setBucketForwardingHandler middleware forwards the path style requests
// on a bucket to the right bucket location, bucket to IP configuration
// is obtained from centralized etcd configuration service.
func setBucketForwardingHandler(h http.Handler) http.Handler {
fwd := handlers.NewForwarder(&handlers.Forwarder{
PassHost: true,
RoundTripper: NewCustomHTTPTransport(),
})
return bucketForwardingHandler{fwd, h}
}
// setRateLimitHandler middleware limits the throughput to h using a
// rate.Limiter token bucket configured with maxOpenFileLimit and
// burst set to 1. The request will idle for up to 1*time.Second.

View File

@@ -22,11 +22,13 @@ import (
"runtime"
"time"
etcdc "github.com/coreos/etcd/client"
humanize "github.com/dustin/go-humanize"
"github.com/fatih/color"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/certs"
"github.com/minio/minio/pkg/dns"
)
// minio configuration related constants.
@@ -160,6 +162,7 @@ var (
globalIsEnvDomainName bool
globalDomainName string // Root domain for virtual host style requests
globalDomainIP string // Root domain IP address
globalListingTimeout = newDynamicTimeout( /*30*/ 600*time.Second /*5*/, 600*time.Second) // timeout for listing related ops
globalObjectTimeout = newDynamicTimeout( /*1*/ 10*time.Minute /*10*/, 600*time.Second) // timeout for Object API related ops
@@ -174,15 +177,19 @@ var (
// Set to store standard storage class
globalStandardStorageClass storageClass
globalIsEnvWORM bool
globalIsEnvWORM bool
// Is worm enabled
globalWORMEnabled bool
// Is Disk Caching set up
globalIsDiskCacheEnabled bool
// Disk cache drives
globalCacheDrives []string
// Disk cache excludes
globalCacheExcludes []string
// Disk cache expiry
globalCacheExpiry = 90
@@ -192,12 +199,18 @@ var (
// Current RPC version
globalRPCAPIVersion = RPCVersion{3, 0, 0}
// Add new variable global values here.
// Allocated etcd endpoint for config and bucket DNS.
globalEtcdClient etcdc.Client
// Allocated DNS config wrapper over etcd client.
globalDNSConfig dns.Config
// Default usage check interval value.
globalDefaultUsageCheckInterval = 12 * time.Hour // 12 hours
// Usage check interval value.
globalUsageCheckInterval = globalDefaultUsageCheckInterval
// Add new variable global values here.
)
// global colors.

View File

@@ -49,6 +49,9 @@ func registerDistXLRouters(router *mux.Router, endpoints EndpointList) {
var globalHandlers = []HandlerFunc{
// set HTTP security headers such as Content-Security-Policy.
addSecurityHeaders,
// set Bucket forwarding handler to proxy path style requests
// when federated backend is enabled.
setBucketForwardingHandler,
// Ratelimit the incoming requests using a token bucket algorithm
setRateLimitHandler,
// Validate all the incoming paths.

View File

@@ -89,6 +89,13 @@ ENVIRONMENT VARIABLES:
WORM:
MINIO_WORM: To turn on Write-Once-Read-Many in server, set this value to "on".
BUCKET-DNS:
MINIO_DOMAIN: To enable virtual-host-style requests.
MINIO_DOMAIN_IP: To enable virtual-host-style requests.
ETCD:
MINIO_ETCD_ENDPOINTS: Comma separated list of etcd endpoints.
EXAMPLES:
1. Start minio server on "/home/shared" directory.
$ {{.HelpName}} /home/shared

View File

@@ -605,7 +605,7 @@ func newTestConfig(bucketLocation string) (rootPath string, err error) {
globalServerConfig.SetRegion(bucketLocation)
// Save config.
if err = globalServerConfig.Save(); err != nil {
if err = globalServerConfig.Save(getConfigFile()); err != nil {
return "", err
}

View File

@@ -480,9 +480,9 @@ func (web *webAPIHandlers) SetAuth(r *http.Request, args *SetAuthArgs, reply *Se
// Update credentials in memory
prevCred := globalServerConfig.SetCredential(creds)
// Save credentials to config file
if err := globalServerConfig.Save(); err != nil {
// As saving configurstion failed, restore previous credential in memory.
// Persist updated credentials.
if err = globalServerConfig.Save(getConfigFile()); err != nil {
// Save the current creds when failed to update.
globalServerConfig.SetCredential(prevCred)
logger.LogIf(context.Background(), err)
return toJSONError(err)