Add support for CopyObject across regions and multiple Minio IPs

This PR adds CopyObject support for objects residing in buckets
in different Minio instances (where Minio instances are part of
a federated setup).

Also, added support for multiple Minio domain IPs. This is required
for distributed deployments, where one deployment may have multiple
nodes, each with a different public IP.
This commit is contained in:
Nitish Tiwari 2018-05-12 00:32:30 +05:30 committed by kannappanr
parent f30c95a301
commit 6ce7265c8c
12 changed files with 208 additions and 82 deletions

View File

@ -31,12 +31,17 @@ import (
"sync" "sync"
"github.com/coreos/etcd/client" "github.com/coreos/etcd/client"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dns"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/policy" "github.com/minio/minio/pkg/policy"
"github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/sync/errgroup"
"github.com/minio/minio-go/pkg/set"
) )
// Check if there are buckets on server without corresponding entry in etcd backend and // Check if there are buckets on server without corresponding entry in etcd backend and
@ -47,7 +52,6 @@ import (
// -- If yes, check if the IP of entry matches local IP. This means entry is for this instance. // -- If yes, check if the IP of entry matches local IP. This means entry is for this instance.
// -- If IP of the entry doesn't match, this means entry is for another instance. Log an error to console. // -- If IP of the entry doesn't match, this means entry is for another instance. Log an error to console.
func initFederatorBackend(objLayer ObjectLayer) { func initFederatorBackend(objLayer ObjectLayer) {
// List all buckets
b, err := objLayer.ListBuckets(context.Background()) b, err := objLayer.ListBuckets(context.Background())
if err != nil { if err != nil {
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
@ -60,15 +64,14 @@ func initFederatorBackend(objLayer ObjectLayer) {
g.Go(func() error { g.Go(func() error {
r, gerr := globalDNSConfig.Get(b[index].Name) r, gerr := globalDNSConfig.Get(b[index].Name)
if gerr != nil { if gerr != nil {
if client.IsKeyNotFound(gerr) { if client.IsKeyNotFound(gerr) || gerr == dns.ErrNoEntriesFound {
// Make a new entry
return globalDNSConfig.Put(b[index].Name) return globalDNSConfig.Put(b[index].Name)
} }
return gerr return gerr
} }
if r.Host != globalDomainIP { if globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() {
// Log error that entry already present for different host // There is already an entry for this bucket, with all IP addresses different. This indicates a bucket name collision. Log an error and continue.
return fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket. Use %s to access the bucket, or rename it to a unique value", b[index].Name, globalDomainIP) return fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket. Use one of these IP addresses %v to access the bucket", b[index].Name, globalDomainIPs.ToSlice())
} }
return nil return nil
}, index) }, index)
@ -418,7 +421,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
if globalDNSConfig != nil { if globalDNSConfig != nil {
if _, err := globalDNSConfig.Get(bucket); err != nil { if _, err := globalDNSConfig.Get(bucket); err != nil {
if client.IsKeyNotFound(err) { if client.IsKeyNotFound(err) || err == dns.ErrNoEntriesFound {
// Proceed to creating a bucket. // Proceed to creating a bucket.
if err = objectAPI.MakeBucketWithLocation(ctx, bucket, location); err != nil { if err = objectAPI.MakeBucketWithLocation(ctx, bucket, location); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)

View File

@ -28,10 +28,13 @@ import (
"time" "time"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
"github.com/minio/cli" "github.com/minio/cli"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/dns" "github.com/minio/minio/pkg/dns"
"github.com/minio/minio-go/pkg/set"
) )
// Check for updates and print a notification message // Check for updates and print a notification message
@ -59,7 +62,7 @@ func initConfig() {
} else { } else {
if etcd.IsKeyNotFound(err) { if etcd.IsKeyNotFound(err) {
logger.FatalIf(newConfig(), "Unable to initialize minio config for the first time.") logger.FatalIf(newConfig(), "Unable to initialize minio config for the first time.")
logger.Info("Created minio configuration file successfully at", globalEtcdClient.Endpoints()) logger.Info("Created minio configuration file successfully at %v", globalEtcdClient.Endpoints())
} else { } else {
logger.FatalIf(err, "Unable to load config version: '%s'.", serverConfigVersion) logger.FatalIf(err, "Unable to load config version: '%s'.", serverConfigVersion)
} }
@ -168,10 +171,20 @@ func handleCommonEnvVars() {
logger.FatalIf(err, "Unable to initialize etcd with %s", etcdEndpoints) logger.FatalIf(err, "Unable to initialize etcd with %s", etcdEndpoints)
} }
globalDomainIP = os.Getenv("MINIO_PUBLIC_IP") minioEndpointsEnv, ok := os.LookupEnv("MINIO_PUBLIC_IPS")
if globalDomainName != "" && globalDomainIP != "" && globalEtcdClient != nil { if ok {
minioEndpoints := strings.Split(minioEndpointsEnv, ",")
globalDomainIPs = set.NewStringSet()
for i, ip := range minioEndpoints {
if net.ParseIP(ip) == nil {
logger.FatalIf(errInvalidArgument, "Unable to initialize Minio server with invalid MINIO_PUBLIC_IPS[%d]: %s", i, ip)
}
globalDomainIPs.Add(ip)
}
}
if globalDomainName != "" && !globalDomainIPs.IsEmpty() && globalEtcdClient != nil {
var err error var err error
globalDNSConfig, err = dns.NewCoreDNS(globalDomainName, globalDomainIP, globalMinioPort, globalEtcdClient) globalDNSConfig, err = dns.NewCoreDNS(globalDomainName, globalDomainIPs, globalMinioPort, globalEtcdClient)
logger.FatalIf(err, "Unable to initialize DNS config for %s.", globalDomainName) logger.FatalIf(err, "Unable to initialize DNS config for %s.", globalDomainName)
} }

View File

@ -26,9 +26,12 @@ import (
"strings" "strings"
"time" "time"
"github.com/minio/minio-go/pkg/set"
"github.com/coreos/etcd/client" "github.com/coreos/etcd/client"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dns"
"github.com/minio/minio/pkg/handlers" "github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/sys" "github.com/minio/minio/pkg/sys"
"github.com/rs/cors" "github.com/rs/cors"
@ -629,29 +632,37 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
return return
} }
bucket, object := urlPath2BucketObjectName(r.URL.Path) bucket, object := urlPath2BucketObjectName(r.URL.Path)
// MakeBucket request // MakeBucket requests should be handled at current endpoint
if r.Method == http.MethodPut && bucket != "" && object == "" { if r.Method == http.MethodPut && bucket != "" && object == "" {
f.handler.ServeHTTP(w, r) f.handler.ServeHTTP(w, r)
return return
} }
// ListBucket request // ListBucket requests should be handled at current endpoint as
// all buckets data can be fetched from here.
if r.Method == http.MethodGet && bucket == "" && object == "" { if r.Method == http.MethodGet && bucket == "" && object == "" {
f.handler.ServeHTTP(w, r) f.handler.ServeHTTP(w, r)
return return
} }
// CopyObject requests should be handled at current endpoint as path style
// requests have target bucket and object in URI and source details are in
// header fields
if r.Method == http.MethodPut && r.Header.Get("X-Amz-Copy-Source") != "" {
f.handler.ServeHTTP(w, r)
return
}
sr, err := globalDNSConfig.Get(bucket) sr, err := globalDNSConfig.Get(bucket)
if err != nil { if err != nil {
if client.IsKeyNotFound(err) { if client.IsKeyNotFound(err) || err == dns.ErrNoEntriesFound {
writeErrorResponse(w, ErrNoSuchBucket, r.URL) writeErrorResponse(w, ErrNoSuchBucket, r.URL)
} else { } else {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
} }
return return
} }
if sr.Host != globalDomainIP { if globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(sr)...)).IsEmpty() {
backendURL := fmt.Sprintf("http://%s:%d", sr.Host, sr.Port) backendURL := fmt.Sprintf("http://%s:%d", sr[0].Host, sr[0].Port)
if globalIsSSL { if globalIsSSL {
backendURL = fmt.Sprintf("https://%s:%d", sr.Host, sr.Port) backendURL = fmt.Sprintf("https://%s:%d", sr[0].Host, sr[0].Port)
} }
r.URL, err = url.Parse(backendURL) r.URL, err = url.Parse(backendURL)
if err != nil { if err != nil {

View File

@ -22,6 +22,8 @@ import (
"runtime" "runtime"
"time" "time"
"github.com/minio/minio-go/pkg/set"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/fatih/color" "github.com/fatih/color"
@ -161,8 +163,8 @@ var (
globalPublicCerts []*x509.Certificate globalPublicCerts []*x509.Certificate
globalIsEnvDomainName bool globalIsEnvDomainName bool
globalDomainName string // Root domain for virtual host style requests globalDomainName string // Root domain for virtual host style requests
globalDomainIP string // Root domain IP address globalDomainIPs set.StringSet // Root domain IP address(s) for a distributed Minio deployment
globalListingTimeout = newDynamicTimeout( /*30*/ 600*time.Second /*5*/, 600*time.Second) // timeout for listing related ops globalListingTimeout = newDynamicTimeout( /*30*/ 600*time.Second /*5*/, 600*time.Second) // timeout for listing related ops
globalObjectTimeout = newDynamicTimeout( /*1*/ 10*time.Minute /*10*/, 600*time.Second) // timeout for Object API related ops globalObjectTimeout = newDynamicTimeout( /*1*/ 10*time.Minute /*10*/, 600*time.Second) // timeout for Object API related ops

View File

@ -20,12 +20,16 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net"
"path" "path"
"runtime" "runtime"
"strconv"
"strings" "strings"
"unicode/utf8" "unicode/utf8"
miniogo "github.com/minio/minio-go"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dns"
"github.com/skyrings/skyring-common/tools/uuid" "github.com/skyrings/skyring-common/tools/uuid"
) )
@ -277,6 +281,40 @@ func isMinioReservedBucket(bucketName string) bool {
return bucketName == minioReservedBucket return bucketName == minioReservedBucket
} }
// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment
func getRemoteInstanceClient(destDNSRecord dns.SrvRecord) (*miniogo.Core, error) {
// In a federated deployment, all the instances share config files and hence expected to have same
// credentials. So, access current instances creds and use it to create client for remote instance
client, err := miniogo.NewCore(net.JoinHostPort(destDNSRecord.Host, strconv.Itoa(destDNSRecord.Port)), globalServerConfig.Credential.AccessKey, globalServerConfig.Credential.SecretKey, globalIsSSL)
if err != nil {
return nil, err
}
return client, nil
}
// Checks if a remote putobject call is needed for CopyObject operation
// 1. If source and destination bucket names are same, it means no call needed to etcd to get destination info
// 2. If destination bucket doesn't exist locally, only then a etcd call is needed
func isRemoteCallRequired(ctx context.Context, src, dst string, objAPI ObjectLayer) bool {
if src == dst {
return false
}
if _, err := objAPI.GetBucketInfo(ctx, dst); err == toObjectErr(errVolumeNotFound, dst) {
return true
}
return false
}
// returns a slice of hosts by reading a slice of DNS records
func getHostsSlice(records []dns.SrvRecord) []string {
var hosts []string
for _, r := range records {
hosts = append(hosts, r.Host)
}
return hosts
}
// byBucketName is a collection satisfying sort.Interface. // byBucketName is a collection satisfying sort.Interface.
type byBucketName []BucketInfo type byBucketName []BucketInfo

View File

@ -536,13 +536,51 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return return
} }
// Copy source object to destination, if source and destination var objInfo ObjectInfo
// object is same then only metadata is updated.
objInfo, err := objectAPI.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo) // _, err = objectAPI.GetBucketInfo(ctx, dstBucket)
if err != nil { // if err == toObjectErr(errVolumeNotFound, dstBucket) && !cpSrcDstSame
pipeWriter.CloseWithError(err) if isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) if globalDNSConfig != nil {
return if dstRecord, errEtcd := globalDNSConfig.Get(dstBucket); errEtcd == nil {
go func() {
if gerr := objectAPI.GetObject(ctx, srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag); gerr != nil {
pipeWriter.CloseWithError(gerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
// Close writer explicitly to indicate data has been written
defer srcInfo.Writer.Close()
}()
// Send PutObject request to appropriate instance (in federated deployment)
client, rerr := getRemoteInstanceClient(dstRecord[0])
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
remoteObjInfo, rerr := client.PutObject(dstBucket, dstObject, srcInfo.Reader, srcInfo.Size, "", "", srcInfo.UserDefined)
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
objInfo.ETag = remoteObjInfo.ETag
objInfo.ModTime = remoteObjInfo.LastModified
}
} else {
writeErrorResponse(w, ErrNoSuchBucket, r.URL)
return
}
} else {
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
objInfo, err = objectAPI.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
} }
pipeReader.Close() pipeReader.Close()

View File

@ -91,7 +91,7 @@ ENVIRONMENT VARIABLES:
BUCKET-DNS: BUCKET-DNS:
MINIO_DOMAIN: To enable bucket DNS requests, set this value to Minio host domain name. MINIO_DOMAIN: To enable bucket DNS requests, set this value to Minio host domain name.
MINIO_PUBLIC_IP: To enable bucket DNS requests, set this value to Minio host public IP. MINIO_PUBLIC_IPS: To enable bucket DNS requests, set this value to list of Minio host public IP(s) delimited by ",".
MINIO_ETCD_ENDPOINTS: To enable bucket DNS requests, set this value to list of etcd endpoints delimited by ",". MINIO_ETCD_ENDPOINTS: To enable bucket DNS requests, set this value to list of etcd endpoints delimited by ",".
EXAMPLES: EXAMPLES:

View File

@ -40,6 +40,7 @@ import (
"github.com/minio/minio/browser" "github.com/minio/minio/browser"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/dns"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/policy" "github.com/minio/minio/pkg/policy"
@ -138,7 +139,7 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep
if globalDNSConfig != nil { if globalDNSConfig != nil {
if _, err := globalDNSConfig.Get(args.BucketName); err != nil { if _, err := globalDNSConfig.Get(args.BucketName); err != nil {
if etcd.IsKeyNotFound(err) { if etcd.IsKeyNotFound(err) || err == dns.ErrNoEntriesFound {
// Proceed to creating a bucket. // Proceed to creating a bucket.
if err = objectAPI.MakeBucketWithLocation(context.Background(), args.BucketName, globalServerConfig.GetRegion()); err != nil { if err = objectAPI.MakeBucketWithLocation(context.Background(), args.BucketName, globalServerConfig.GetRegion()); err != nil {
return toJSONError(err) return toJSONError(err)

View File

@ -21,21 +21,21 @@ Bucket lookup federation requires two dependencies
``` ```
export MINIO_ETCD_ENDPOINTS="http://remote-etcd1:2379,http://remote-etcd2:4001" export MINIO_ETCD_ENDPOINTS="http://remote-etcd1:2379,http://remote-etcd2:4001"
export MINIO_DOMAIN=domain.com export MINIO_DOMAIN=domain.com
export MINIO_PUBLIC_IP=44.35.2.1 export MINIO_PUBLIC_IPS=44.35.2.1,44.35.2.2,44.35.2.3,44.35.2.4
minio server http://rack{1...4}.host{1...4}.domain.com/mnt/export{1...32} minio server http://rack{1...4}.host{1...4}.domain.com/mnt/export{1...32}
``` ```
> cluster2 > cluster2
``` ```
export MINIO_ETCD_ENDPOINTS="http://remote-etcd1:2379,http://remote-etcd2:4001" export MINIO_ETCD_ENDPOINTS="http://remote-etcd1:2379,http://remote-etcd2:4001"
export MINIO_DOMAIN=domain.com export MINIO_DOMAIN=domain.com
export MINIO_PUBLIC_IP=44.35.2.2 export MINIO_PUBLIC_IPS=44.35.1.1,44.35.1.2,44.35.1.3,44.35.1.4
minio server http://rack{5...8}.host{5...8}.domain.com/mnt/export{1...32} minio server http://rack{5...8}.host{5...8}.domain.com/mnt/export{1...32}
``` ```
In this configuration you can see `MINIO_ETCD_ENDPOINTS` points to the etcd backend which manages Minio's In this configuration you can see `MINIO_ETCD_ENDPOINTS` points to the etcd backend which manages Minio's
`config.json` and bucket DNS SRV records. `MINIO_DOMAIN` indicates the domain suffix for the bucket which `config.json` and bucket DNS SRV records. `MINIO_DOMAIN` indicates the domain suffix for the bucket which
will be used to resolve bucket through DNS. For example if you have a bucket such as `mybucket`, the will be used to resolve bucket through DNS. For example if you have a bucket such as `mybucket`, the
client can use now `mybucket.domain.com` to directly resolve itself to the right cluster. `MINIO_PUBLIC_IP` client can use now `mybucket.domain.com` to directly resolve itself to the right cluster. `MINIO_PUBLIC_IPS`
points to the public IP address where each cluster might be accessible, this is unique for each cluster. points to the public IP address where each cluster might be accessible, this is unique for each cluster.
NOTE: `mybucket` only exists on one cluster either `cluster1` or `cluster2` this is random and NOTE: `mybucket` only exists on one cluster either `cluster1` or `cluster2` this is random and

View File

@ -21,18 +21,22 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"net"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/minio/minio-go/pkg/set"
"github.com/coredns/coredns/plugin/etcd/msg" "github.com/coredns/coredns/plugin/etcd/msg"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
) )
// ErrNoEntriesFound - Indicates no entries were found for the given key (directory)
var ErrNoEntriesFound = errors.New("No entries found for this key")
// create a new coredns service record for the bucket. // create a new coredns service record for the bucket.
func newCoreDNSMsg(bucket string, ip string, port int, ttl uint32) ([]byte, error) { func newCoreDNSMsg(bucket, ip string, port int, ttl uint32) ([]byte, error) {
return json.Marshal(&SrvRecord{ return json.Marshal(&SrvRecord{
Host: ip, Host: ip,
Port: port, Port: port,
@ -41,12 +45,24 @@ func newCoreDNSMsg(bucket string, ip string, port int, ttl uint32) ([]byte, erro
}) })
} }
// Retrieves list of DNS entries for a bucket. // Retrieves list of DNS entries for the domain.
func (c *coreDNS) List() ([]SrvRecord, error) { func (c *coreDNS) List() ([]SrvRecord, error) {
kapi := etcd.NewKeysAPI(c.etcdClient)
key := msg.Path(fmt.Sprintf("%s.", c.domainName), defaultPrefixPath) key := msg.Path(fmt.Sprintf("%s.", c.domainName), defaultPrefixPath)
return c.list(key)
}
// Retrieves DNS records for a bucket.
func (c *coreDNS) Get(bucket string) ([]SrvRecord, error) {
key := msg.Path(fmt.Sprintf("%s.%s.", bucket, c.domainName), defaultPrefixPath)
return c.list(key)
}
// Retrieves list of entries under the key passed.
// Note that this method fetches entries upto only two levels deep.
func (c *coreDNS) list(key string) ([]SrvRecord, error) {
kapi := etcd.NewKeysAPI(c.etcdClient)
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout) ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
r, err := kapi.Get(ctx, key, nil) r, err := kapi.Get(ctx, key, &etcd.GetOptions{Recursive: true})
cancel() cancel()
if err != nil { if err != nil {
return nil, err return nil, err
@ -54,48 +70,52 @@ func (c *coreDNS) List() ([]SrvRecord, error) {
var srvRecords []SrvRecord var srvRecords []SrvRecord
for _, n := range r.Node.Nodes { for _, n := range r.Node.Nodes {
var srvRecord SrvRecord if !n.Dir {
if err = json.Unmarshal([]byte(n.Value), &srvRecord); err != nil { var srvRecord SrvRecord
return nil, err if err = json.Unmarshal([]byte(n.Value), &srvRecord); err != nil {
return nil, err
}
srvRecord.Key = strings.TrimPrefix(n.Key, key)
srvRecords = append(srvRecords, srvRecord)
} else {
// As this is a directory, loop through all the nodes inside
for _, n1 := range n.Nodes {
var srvRecord SrvRecord
if err = json.Unmarshal([]byte(n1.Value), &srvRecord); err != nil {
return nil, err
}
srvRecord.Key = strings.TrimPrefix(n1.Key, key)
srvRecord.Key = strings.TrimSuffix(srvRecord.Key, srvRecord.Host)
srvRecords = append(srvRecords, srvRecord)
}
} }
srvRecord.Key = strings.TrimPrefix(n.Key, key)
srvRecords = append(srvRecords, srvRecord)
} }
sort.Slice(srvRecords, func(i int, j int) bool { if srvRecords != nil {
return srvRecords[i].Key < srvRecords[j].Key sort.Slice(srvRecords, func(i int, j int) bool {
}) return srvRecords[i].Key < srvRecords[j].Key
})
} else {
return nil, ErrNoEntriesFound
}
return srvRecords, nil return srvRecords, nil
} }
// Retrieves DNS record for a bucket. // Adds DNS entries into etcd endpoint in CoreDNS etcd message format.
func (c *coreDNS) Get(bucket string) (SrvRecord, error) {
kapi := etcd.NewKeysAPI(c.etcdClient)
key := msg.Path(fmt.Sprintf("%s.%s.", bucket, c.domainName), defaultPrefixPath)
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
r, err := kapi.Get(ctx, key, nil)
cancel()
if err != nil {
return SrvRecord{}, err
}
var sr SrvRecord
if err = json.Unmarshal([]byte(r.Node.Value), &sr); err != nil {
return SrvRecord{}, err
}
sr.Key = strings.TrimPrefix(r.Node.Key, key)
return sr, nil
}
// Adds DNS entries into etcd endpoint in CoreDNS etcd messae format.
func (c *coreDNS) Put(bucket string) error { func (c *coreDNS) Put(bucket string) error {
bucketMsg, err := newCoreDNSMsg(bucket, c.domainIP, c.domainPort, defaultTTL) var err error
if err != nil { for ip := range c.domainIPs {
return err var bucketMsg []byte
bucketMsg, err = newCoreDNSMsg(bucket, ip, c.domainPort, defaultTTL)
if err != nil {
return err
}
kapi := etcd.NewKeysAPI(c.etcdClient)
key := msg.Path(fmt.Sprintf("%s.%s", bucket, c.domainName), defaultPrefixPath)
key = key + "/" + ip
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
_, err = kapi.Set(ctx, key, string(bucketMsg), nil)
cancel()
} }
kapi := etcd.NewKeysAPI(c.etcdClient)
key := msg.Path(fmt.Sprintf("%s.%s.", bucket, c.domainName), defaultPrefixPath)
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
_, err = kapi.Set(ctx, key, string(bucketMsg), nil)
cancel()
return err return err
} }
@ -111,18 +131,15 @@ func (c *coreDNS) Delete(bucket string) error {
// CoreDNS - represents dns config for coredns server. // CoreDNS - represents dns config for coredns server.
type coreDNS struct { type coreDNS struct {
domainName, domainIP string domainName string
domainPort int domainIPs set.StringSet
etcdClient etcd.Client domainPort int
etcdClient etcd.Client
} }
// NewCoreDNS - initialize a new coreDNS set/unset values. // NewCoreDNS - initialize a new coreDNS set/unset values.
func NewCoreDNS(domainName, domainIP, domainPort string, etcdClient etcd.Client) (Config, error) { func NewCoreDNS(domainName string, domainIPs set.StringSet, domainPort string, etcdClient etcd.Client) (Config, error) {
if domainName == "" || domainIP == "" || etcdClient == nil { if domainName == "" || domainIPs.IsEmpty() || etcdClient == nil {
return nil, errors.New("invalid argument")
}
if net.ParseIP(domainIP) == nil {
return nil, errors.New("invalid argument") return nil, errors.New("invalid argument")
} }
@ -133,7 +150,7 @@ func NewCoreDNS(domainName, domainIP, domainPort string, etcdClient etcd.Client)
return &coreDNS{ return &coreDNS{
domainName: domainName, domainName: domainName,
domainIP: domainIP, domainIPs: domainIPs,
domainPort: port, domainPort: port,
etcdClient: etcdClient, etcdClient: etcdClient,
}, nil }, nil

View File

@ -60,6 +60,6 @@ type SrvRecord struct {
type Config interface { type Config interface {
Put(key string) error Put(key string) error
List() ([]SrvRecord, error) List() ([]SrvRecord, error)
Get(key string) (SrvRecord, error) Get(key string) ([]SrvRecord, error)
Delete(key string) error Delete(key string) error
} }

View File

@ -140,6 +140,9 @@ func saveFileConfigEtcd(filename string, clnt etcd.Client, v interface{}) error
kapi := etcd.NewKeysAPI(clnt) kapi := etcd.NewKeysAPI(clnt)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
_, err = kapi.Update(ctx, filename, string(dataBytes)) _, err = kapi.Update(ctx, filename, string(dataBytes))
if etcd.IsKeyNotFound(err) {
_, err = kapi.Create(ctx, filename, string(dataBytes))
}
cancel() cancel()
return err return err
} }