mirror of
https://github.com/minio/minio.git
synced 2025-11-10 22:10:12 -05:00
Update federation target to etcd/clientv3 (#6119)
With CoreDNS now supporting etcdv3 as the DNS backend, we can update our federation target to etcdv3. Users will now be able to use etcdv3 server as the federation backbone. Minio will update bucket data to etcdv3 and CoreDNS can pick that data up and serve it as bucket style DNS path.
This commit is contained in:
committed by
kannappanr
parent
adf7340394
commit
2aa18cafc6
@@ -29,7 +29,7 @@ import (
|
||||
"github.com/minio/minio-go/pkg/set"
|
||||
|
||||
"github.com/coredns/coredns/plugin/etcd/msg"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
etcd "github.com/coreos/etcd/clientv3"
|
||||
)
|
||||
|
||||
// ErrNoEntriesFound - Indicates no entries were found for the given key (directory)
|
||||
@@ -60,35 +60,33 @@ func (c *coreDNS) Get(bucket string) ([]SrvRecord, error) {
|
||||
// Retrieves list of entries under the key passed.
|
||||
// Note that this method fetches entries upto only two levels deep.
|
||||
func (c *coreDNS) list(key string) ([]SrvRecord, error) {
|
||||
kapi := etcd.NewKeysAPI(c.etcdClient)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
|
||||
r, err := kapi.Get(ctx, key, &etcd.GetOptions{Recursive: true})
|
||||
cancel()
|
||||
r, err := c.etcdClient.Get(ctx, key, etcd.WithPrefix())
|
||||
defer cancel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.Count == 0 {
|
||||
key = strings.TrimSuffix(key, "/")
|
||||
r, err = c.etcdClient.Get(ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.Count == 0 {
|
||||
return nil, ErrNoEntriesFound
|
||||
}
|
||||
}
|
||||
|
||||
var srvRecords []SrvRecord
|
||||
for _, n := range r.Node.Nodes {
|
||||
if !n.Dir {
|
||||
var srvRecord SrvRecord
|
||||
if err = json.Unmarshal([]byte(n.Value), &srvRecord); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
srvRecord.Key = strings.TrimPrefix(n.Key, key)
|
||||
srvRecords = append(srvRecords, srvRecord)
|
||||
} else {
|
||||
// As this is a directory, loop through all the nodes inside (assuming all nodes are non-directories)
|
||||
for _, n1 := range n.Nodes {
|
||||
var srvRecord SrvRecord
|
||||
if err = json.Unmarshal([]byte(n1.Value), &srvRecord); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
srvRecord.Key = strings.TrimPrefix(n1.Key, key)
|
||||
srvRecord.Key = strings.TrimSuffix(srvRecord.Key, srvRecord.Host)
|
||||
srvRecords = append(srvRecords, srvRecord)
|
||||
}
|
||||
for _, n := range r.Kvs {
|
||||
var srvRecord SrvRecord
|
||||
if err = json.Unmarshal([]byte(n.Value), &srvRecord); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
srvRecord.Key = strings.TrimPrefix(string(n.Key), key)
|
||||
srvRecord.Key = strings.TrimSuffix(srvRecord.Key, srvRecord.Host)
|
||||
srvRecords = append(srvRecords, srvRecord)
|
||||
|
||||
}
|
||||
if srvRecords != nil {
|
||||
sort.Slice(srvRecords, func(i int, j int) bool {
|
||||
@@ -107,16 +105,15 @@ func (c *coreDNS) Put(bucket string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
kapi := etcd.NewKeysAPI(c.etcdClient)
|
||||
key := msg.Path(fmt.Sprintf("%s.%s", bucket, c.domainName), defaultPrefixPath)
|
||||
key = key + "/" + ip
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
|
||||
_, err = kapi.Set(ctx, key, string(bucketMsg), nil)
|
||||
cancel()
|
||||
_, err = c.etcdClient.Put(ctx, key, string(bucketMsg))
|
||||
defer cancel()
|
||||
if err != nil {
|
||||
ctx, cancel = context.WithTimeout(context.Background(), defaultContextTimeout)
|
||||
kapi.Delete(ctx, key, &etcd.DeleteOptions{Recursive: true})
|
||||
cancel()
|
||||
c.etcdClient.Delete(ctx, key)
|
||||
defer cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -125,11 +122,10 @@ func (c *coreDNS) Put(bucket string) error {
|
||||
|
||||
// Removes DNS entries added in Put().
|
||||
func (c *coreDNS) Delete(bucket string) error {
|
||||
kapi := etcd.NewKeysAPI(c.etcdClient)
|
||||
key := msg.Path(fmt.Sprintf("%s.%s.", bucket, c.domainName), defaultPrefixPath)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
|
||||
_, err := kapi.Delete(ctx, key, &etcd.DeleteOptions{Recursive: true})
|
||||
cancel()
|
||||
_, err := c.etcdClient.Delete(ctx, key)
|
||||
defer cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -138,12 +134,12 @@ type coreDNS struct {
|
||||
domainName string
|
||||
domainIPs set.StringSet
|
||||
domainPort int
|
||||
etcdClient etcd.Client
|
||||
etcdClient *etcd.Client
|
||||
}
|
||||
|
||||
// NewCoreDNS - initialize a new coreDNS set/unset values.
|
||||
func NewCoreDNS(domainName string, domainIPs set.StringSet, domainPort string, etcdClient etcd.Client) (Config, error) {
|
||||
if domainName == "" || domainIPs.IsEmpty() || etcdClient == nil {
|
||||
func NewCoreDNS(domainName string, domainIPs set.StringSet, domainPort string, etcdClient *etcd.Client) (Config, error) {
|
||||
if domainName == "" || domainIPs.IsEmpty() {
|
||||
return nil, errors.New("invalid argument")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user