Use random host from among multiple hosts to create requests

Also use hosts passed to Minio startup command to populate IP
addresses if MINIO_PUBLIC_IPS is not set.
This commit is contained in:
Nitish Tiwari
2018-05-16 06:50:22 +05:30
committed by kannappanr
parent 6ce7265c8c
commit 3dc13323e5
15 changed files with 164 additions and 119 deletions

View File

@@ -819,7 +819,7 @@ func (a adminAPIHandlers) UpdateCredentialsHandler(w http.ResponseWriter,
// Update local credentials in memory.
globalServerConfig.SetCredential(creds)
if err = globalServerConfig.Save(getConfigFile()); err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
writeErrorResponseJSON(w, ErrInternalError, r.URL)
return
}

View File

@@ -21,6 +21,7 @@ import (
"net/http"
"net/url"
"path"
"strings"
"time"
"github.com/minio/minio/pkg/handlers"
@@ -293,8 +294,10 @@ func getObjectLocation(r *http.Request, domain, bucket, object string) string {
}
// If domain is set then we need to use bucket DNS style.
if domain != "" {
u.Host = bucket + "." + domain
u.Path = path.Join(slashSeparator, object)
if strings.Contains(r.Host, domain) {
u.Host = bucket + "." + r.Host
u.Path = path.Join(slashSeparator, object)
}
}
return u.String()
}

View File

@@ -30,7 +30,7 @@ import (
"strings"
"sync"
"github.com/coreos/etcd/client"
etcd "github.com/coreos/etcd/client"
"github.com/gorilla/mux"
@@ -64,7 +64,7 @@ func initFederatorBackend(objLayer ObjectLayer) {
g.Go(func() error {
r, gerr := globalDNSConfig.Get(b[index].Name)
if gerr != nil {
if client.IsKeyNotFound(gerr) || gerr == dns.ErrNoEntriesFound {
if etcd.IsKeyNotFound(gerr) || gerr == dns.ErrNoEntriesFound {
return globalDNSConfig.Put(b[index].Name)
}
return gerr
@@ -211,15 +211,20 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
var bucketsInfo []BucketInfo
if globalDNSConfig != nil {
dnsBuckets, err := globalDNSConfig.List()
if err != nil {
if err != nil && !etcd.IsKeyNotFound(err) && err != dns.ErrNoEntriesFound {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
bucketSet := set.NewStringSet()
for _, dnsRecord := range dnsBuckets {
if bucketSet.Contains(dnsRecord.Key) {
continue
}
bucketsInfo = append(bucketsInfo, BucketInfo{
Name: dnsRecord.Key,
Name: strings.Trim(dnsRecord.Key, slashSeparator),
Created: dnsRecord.CreationDate,
})
bucketSet.Add(dnsRecord.Key)
}
} else {
// Invoke the list buckets.
@@ -421,7 +426,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
if globalDNSConfig != nil {
if _, err := globalDNSConfig.Get(bucket); err != nil {
if client.IsKeyNotFound(err) || err == dns.ErrNoEntriesFound {
if etcd.IsKeyNotFound(err) || err == dns.ErrNoEntriesFound {
// Proceed to creating a bucket.
if err = objectAPI.MakeBucketWithLocation(ctx, bucket, location); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)

View File

@@ -20,7 +20,6 @@ import (
"context"
"errors"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
@@ -151,26 +150,19 @@ func handleCommonEnvVars() {
logger.FatalIf(err, "error opening file %s", traceFile)
}
globalDomainName, globalIsEnvDomainName = os.LookupEnv("MINIO_DOMAIN")
etcdEndpointsEnv, ok := os.LookupEnv("MINIO_ETCD_ENDPOINTS")
if ok {
etcdEndpoints := strings.Split(etcdEndpointsEnv, ",")
var err error
globalEtcdClient, err = etcd.New(etcd.Config{
Endpoints: etcdEndpoints,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
},
Transport: NewCustomHTTPTransport(),
})
logger.FatalIf(err, "Unable to initialize etcd with %s", etcdEndpoints)
}
globalDomainName, globalIsEnvDomainName = os.LookupEnv("MINIO_DOMAIN")
minioEndpointsEnv, ok := os.LookupEnv("MINIO_PUBLIC_IPS")
if ok {
minioEndpoints := strings.Split(minioEndpointsEnv, ",")

View File

@@ -27,6 +27,7 @@ import (
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/event/target"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/quick"
)
// DO NOT EDIT following message template, please open a github issue to discuss instead.
@@ -1966,7 +1967,7 @@ func migrateV23ToV24() error {
configFile := getConfigFile()
cv23 := &serverConfigV23{}
_, err := quick.Load(configFile, cv23)
_, err := quick.LoadConfig(configFile, globalEtcdClient, cv23)
if os.IsNotExist(err) {
return nil
} else if err != nil {
@@ -2067,7 +2068,7 @@ func migrateV23ToV24() error {
srvConfig.Cache.Exclude = cv23.Cache.Exclude
srvConfig.Cache.Expiry = cv23.Cache.Expiry
if err = quick.Save(configFile, srvConfig); err != nil {
if err = quick.SaveConfig(srvConfig, configFile, globalEtcdClient); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %v", cv23.Version, srvConfig.Version, err)
}
@@ -2079,7 +2080,7 @@ func migrateV24ToV25() error {
configFile := getConfigFile()
cv24 := &serverConfigV24{}
_, err := quick.Load(configFile, cv24)
_, err := quick.LoadConfig(configFile, globalEtcdClient, cv24)
if os.IsNotExist(err) {
return nil
} else if err != nil {
@@ -2185,7 +2186,7 @@ func migrateV24ToV25() error {
srvConfig.Cache.Exclude = cv24.Cache.Exclude
srvConfig.Cache.Expiry = cv24.Cache.Expiry
if err = quick.Save(configFile, srvConfig); err != nil {
if err = quick.SaveConfig(srvConfig, configFile, globalEtcdClient); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %v", cv24.Version, srvConfig.Version, err)
}

View File

@@ -586,8 +586,6 @@ type serverConfigV22 struct {
// IMPORTANT NOTE: When updating this struct make sure that
// serverConfig.ConfigDiff() is updated as necessary.
type serverConfigV23 struct {
quick.Config `json:"-"` // ignore interfaces
Version string `json:"version"`
// S3 API configuration.
@@ -636,6 +634,8 @@ type serverConfigV24 struct {
// IMPORTANT NOTE: When updating this struct make sure that
// serverConfig.ConfigDiff() is updated as necessary.
type serverConfigV25 struct {
quick.Config `json:"-"` // ignore interfaces
Version string `json:"version"`
// S3 API configuration.

View File

@@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/url"
"os"
"path"
"path/filepath"
"runtime"
@@ -442,6 +443,8 @@ func CreateEndpoints(serverAddr string, args ...[]string) (string, EndpointList,
return serverAddr, endpoints, setupType, err
}
updateDomainIPs(uniqueArgs)
setupType = DistXLSetupType
return serverAddr, endpoints, setupType, nil
}
@@ -493,3 +496,22 @@ func GetRemotePeers(endpoints EndpointList) []string {
return peerSet.ToSlice()
}
// In federated and distributed setup, update IP addresses of the hosts passed in command line
// if MINIO_PUBLIC_IPS are not set manually
func updateDomainIPs(endPoints set.StringSet) {
_, dok := os.LookupEnv("MINIO_DOMAIN")
_, eok := os.LookupEnv("MINIO_ETCD_ENDPOINTS")
_, iok := os.LookupEnv("MINIO_PUBLIC_IPS")
if dok && eok && !iok {
globalDomainIPs = set.NewStringSet()
for e := range endPoints {
host, _, _ := net.SplitHostPort(e)
ipList, _ := getHostIP4(host)
remoteIPList := ipList.FuncMatch(func(ip string, matchString string) bool {
return !strings.HasPrefix(ip, "127.")
}, "")
globalDomainIPs.Add(remoteIPList.ToSlice()[0])
}
}
}

View File

@@ -22,13 +22,12 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/minio/minio-go/pkg/set"
"github.com/coreos/etcd/client"
etcd "github.com/coreos/etcd/client"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dns"
@@ -632,17 +631,19 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
return
}
bucket, object := urlPath2BucketObjectName(r.URL.Path)
// MakeBucket requests should be handled at current endpoint
if r.Method == http.MethodPut && bucket != "" && object == "" {
f.handler.ServeHTTP(w, r)
return
}
// ListBucket requests should be handled at current endpoint as
// all buckets data can be fetched from here.
if r.Method == http.MethodGet && bucket == "" && object == "" {
f.handler.ServeHTTP(w, r)
return
}
// MakeBucket requests should be handled at current endpoint
if r.Method == http.MethodPut && bucket != "" && object == "" {
f.handler.ServeHTTP(w, r)
return
}
// CopyObject requests should be handled at current endpoint as path style
// requests have target bucket and object in URI and source details are in
// header fields
@@ -652,7 +653,7 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
}
sr, err := globalDNSConfig.Get(bucket)
if err != nil {
if client.IsKeyNotFound(err) || err == dns.ErrNoEntriesFound {
if etcd.IsKeyNotFound(err) || err == dns.ErrNoEntriesFound {
writeErrorResponse(w, ErrNoSuchBucket, r.URL)
} else {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
@@ -660,15 +661,12 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
return
}
if globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(sr)...)).IsEmpty() {
backendURL := fmt.Sprintf("http://%s:%d", sr[0].Host, sr[0].Port)
host, port := getRandomHostPort(sr)
r.URL.Scheme = "http"
if globalIsSSL {
backendURL = fmt.Sprintf("https://%s:%d", sr[0].Host, sr[0].Port)
}
r.URL, err = url.Parse(backendURL)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
r.URL.Scheme = "https"
}
r.URL.Host = fmt.Sprintf("%s:%d", host, port)
f.fwd.ServeHTTP(w, r)
return
}

View File

@@ -20,14 +20,13 @@ import (
"context"
"encoding/hex"
"fmt"
"net"
"math/rand"
"path"
"runtime"
"strconv"
"strings"
"time"
"unicode/utf8"
miniogo "github.com/minio/minio-go"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dns"
"github.com/skyrings/skyring-common/tools/uuid"
@@ -281,31 +280,6 @@ func isMinioReservedBucket(bucketName string) bool {
return bucketName == minioReservedBucket
}
// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment
func getRemoteInstanceClient(destDNSRecord dns.SrvRecord) (*miniogo.Core, error) {
// In a federated deployment, all the instances share config files and hence expected to have same
// credentials. So, access current instances creds and use it to create client for remote instance
client, err := miniogo.NewCore(net.JoinHostPort(destDNSRecord.Host, strconv.Itoa(destDNSRecord.Port)), globalServerConfig.Credential.AccessKey, globalServerConfig.Credential.SecretKey, globalIsSSL)
if err != nil {
return nil, err
}
return client, nil
}
// Checks if a remote putobject call is needed for CopyObject operation
// 1. If source and destination bucket names are same, it means no call needed to etcd to get destination info
// 2. If destination bucket doesn't exist locally, only then a etcd call is needed
func isRemoteCallRequired(ctx context.Context, src, dst string, objAPI ObjectLayer) bool {
if src == dst {
return false
}
if _, err := objAPI.GetBucketInfo(ctx, dst); err == toObjectErr(errVolumeNotFound, dst) {
return true
}
return false
}
// returns a slice of hosts by reading a slice of DNS records
func getHostsSlice(records []dns.SrvRecord) []string {
var hosts []string
@@ -315,6 +289,13 @@ func getHostsSlice(records []dns.SrvRecord) []string {
return hosts
}
// returns a random host (and corresponding port) from a slice of DNS records
func getRandomHostPort(records []dns.SrvRecord) (string, int) {
rand.Seed(time.Now().Unix())
srvRecord := records[rand.Intn(len(records))]
return srvRecord.Host, srvRecord.Port
}
// byBucketName is a collection satisfying sort.Interface.
type byBucketName []BucketInfo

View File

@@ -32,7 +32,9 @@ import (
"strconv"
"github.com/gorilla/mux"
miniogo "github.com/minio/minio-go"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dns"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/ioutil"
@@ -538,40 +540,62 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
var objInfo ObjectInfo
// _, err = objectAPI.GetBucketInfo(ctx, dstBucket)
// if err == toObjectErr(errVolumeNotFound, dstBucket) && !cpSrcDstSame
// Checks if a remote putobject call is needed for CopyObject operation
// 1. If source and destination bucket names are same, it means no call needed to etcd to get destination info
// 2. If destination bucket doesn't exist locally, only then a etcd call is needed
var isRemoteCallRequired = func(ctx context.Context, src, dst string, objAPI ObjectLayer) bool {
if src == dst {
return false
}
_, berr := objAPI.GetBucketInfo(ctx, dst)
return berr == toObjectErr(errVolumeNotFound, dst)
}
// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment
var getRemoteInstanceClient = func(host string, port int) (*miniogo.Core, error) {
// In a federated deployment, all the instances share config files and hence expected to have same
// credentials. So, access current instances creds and use it to create client for remote instance
endpoint := net.JoinHostPort(host, strconv.Itoa(port))
accessKey := globalServerConfig.Credential.AccessKey
secretKey := globalServerConfig.Credential.SecretKey
return miniogo.NewCore(endpoint, accessKey, secretKey, globalIsSSL)
}
if isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) {
if globalDNSConfig != nil {
if dstRecord, errEtcd := globalDNSConfig.Get(dstBucket); errEtcd == nil {
go func() {
if gerr := objectAPI.GetObject(ctx, srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag); gerr != nil {
pipeWriter.CloseWithError(gerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
// Close writer explicitly to indicate data has been written
defer srcInfo.Writer.Close()
}()
// Send PutObject request to appropriate instance (in federated deployment)
client, rerr := getRemoteInstanceClient(dstRecord[0])
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
remoteObjInfo, rerr := client.PutObject(dstBucket, dstObject, srcInfo.Reader, srcInfo.Size, "", "", srcInfo.UserDefined)
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
objInfo.ETag = remoteObjInfo.ETag
objInfo.ModTime = remoteObjInfo.LastModified
}
} else {
if globalDNSConfig == nil {
writeErrorResponse(w, ErrNoSuchBucket, r.URL)
return
}
var dstRecords []dns.SrvRecord
if dstRecords, err = globalDNSConfig.Get(dstBucket); err == nil {
go func() {
if gerr := objectAPI.GetObject(ctx, srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag); gerr != nil {
pipeWriter.CloseWithError(gerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
// Close writer explicitly to indicate data has been written
srcInfo.Writer.Close()
}()
// Send PutObject request to appropriate instance (in federated deployment)
host, port := getRandomHostPort(dstRecords)
client, rerr := getRemoteInstanceClient(host, port)
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
remoteObjInfo, rerr := client.PutObject(dstBucket, dstObject, srcInfo.Reader, srcInfo.Size, "", "", srcInfo.UserDefined)
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
objInfo.ETag = remoteObjInfo.ETag
objInfo.ModTime = remoteObjInfo.LastModified
}
} else {
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.

View File

@@ -178,7 +178,7 @@ func (receiver *peerRPCReceiver) SetCredentials(args *SetCredentialsArgs, reply
prevCred := globalServerConfig.SetCredential(args.Credentials)
// Save credentials to config file
if err := globalServerConfig.Save(); err != nil {
if err := globalServerConfig.Save(getConfigFile()); err != nil {
// As saving configurstion failed, restore previous credential in memory.
globalServerConfig.SetCredential(prevCred)

View File

@@ -258,7 +258,7 @@ func ToS3ETag(etag string) string {
// used while communicating with the cloud backends.
// This sets the value for MaxIdleConnsPerHost from 2 (go default)
// to 100.
func NewCustomHTTPTransport() http.RoundTripper {
func NewCustomHTTPTransport() *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{