mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: Avoid re-reading bucket names from etcd (#8924)
This helps improve performance when there are 1000+ bucket entries on etcd, improves the startup time significantly.
This commit is contained in:
parent
680e493065
commit
9bbf5cb74f
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* MinIO Cloud Storage, (C) 2015, 2016, 2017, 2018 MinIO, Inc.
|
* MinIO Cloud Storage, (C) 2015-2020 MinIO, Inc.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@ -22,7 +22,6 @@ import (
|
|||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
@ -75,40 +74,45 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketSet := set.NewStringSet()
|
bucketsSet := set.NewStringSet()
|
||||||
|
bucketsToBeUpdated := set.NewStringSet()
|
||||||
|
bucketsInConflict := set.NewStringSet()
|
||||||
|
for _, bucket := range buckets {
|
||||||
|
bucketsSet.Add(bucket.Name)
|
||||||
|
r, ok := dnsBuckets[bucket.Name]
|
||||||
|
if !ok {
|
||||||
|
bucketsToBeUpdated.Add(bucket.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() {
|
||||||
|
if globalDomainIPs.Difference(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() {
|
||||||
|
// No difference in terms of domainIPs and nothing
|
||||||
|
// has changed so we don't change anything on the etcd.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// if domain IPs intersect then it won't be an empty set.
|
||||||
|
// such an intersection means that bucket exists on etcd.
|
||||||
|
// but if we do see a difference with local domain IPs with
|
||||||
|
// hostSlice from etcd then we should update with newer
|
||||||
|
// domainIPs, we proceed to do that here.
|
||||||
|
bucketsToBeUpdated.Add(bucket.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// No IPs seem to intersect, this means that bucket exists but has
|
||||||
|
// different IP addresses perhaps from a different deployment.
|
||||||
|
// bucket names are globally unique in federation at a given
|
||||||
|
// path prefix, name collision is not allowed. We simply log
|
||||||
|
// an error and continue.
|
||||||
|
bucketsInConflict.Add(bucket.Name)
|
||||||
|
}
|
||||||
|
|
||||||
// Add buckets that are not registered with the DNS
|
// Add/update buckets that are not registered with the DNS
|
||||||
g := errgroup.WithNErrs(len(buckets))
|
g := errgroup.WithNErrs(len(buckets))
|
||||||
for index := range buckets {
|
bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice()
|
||||||
bucketSet.Add(buckets[index].Name)
|
for index := range bucketsToBeUpdatedSlice {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
r, gerr := globalDNSConfig.Get(buckets[index].Name)
|
return globalDNSConfig.Put(bucketsToBeUpdatedSlice[index])
|
||||||
if gerr != nil {
|
|
||||||
if gerr == dns.ErrNoEntriesFound {
|
|
||||||
return globalDNSConfig.Put(buckets[index].Name)
|
|
||||||
}
|
|
||||||
return gerr
|
|
||||||
}
|
|
||||||
if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() {
|
|
||||||
if globalDomainIPs.Difference(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() {
|
|
||||||
// No difference in terms of domainIPs and nothing
|
|
||||||
// has changed so we don't change anything on the etcd.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// if domain IPs intersect then it won't be an empty set.
|
|
||||||
// such an intersection means that bucket exists on etcd.
|
|
||||||
// but if we do see a difference with local domain IPs with
|
|
||||||
// hostSlice from etcd then we should update with newer
|
|
||||||
// domainIPs, we proceed to do that here.
|
|
||||||
return globalDNSConfig.Put(buckets[index].Name)
|
|
||||||
}
|
|
||||||
// No IPs seem to intersect, this means that bucket exists but has
|
|
||||||
// different IP addresses perhaps from a different deployment.
|
|
||||||
// bucket names are globally unique in federation at a given
|
|
||||||
// path prefix, name collision is not allowed. We simply log
|
|
||||||
// an error and continue.
|
|
||||||
return fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket. Use one of these IP addresses %v to access the bucket", buckets[index].Name, globalDomainIPs.ToSlice())
|
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,36 +122,26 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
g = errgroup.WithNErrs(len(dnsBuckets))
|
for _, bucket := range bucketsInConflict.ToSlice() {
|
||||||
// Remove buckets that are in DNS for this server, but aren't local
|
logger.LogIf(context.Background(), fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket. Use one of these IP addresses %v to access the bucket", bucket, globalDomainIPs.ToSlice()))
|
||||||
for index := range dnsBuckets {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
// This is a local bucket that exists, so we can continue
|
|
||||||
if bucketSet.Contains(dnsBuckets[index].Key) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is not for our server, so we can continue
|
|
||||||
hostPort := net.JoinHostPort(dnsBuckets[index].Host, string(dnsBuckets[index].Port))
|
|
||||||
if globalDomainIPs.Intersection(set.CreateStringSet(hostPort)).IsEmpty() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// We go to here, so we know the bucket no longer exists,
|
|
||||||
// but is registered in DNS to this server
|
|
||||||
if err := globalDNSConfig.DeleteRecord(dnsBuckets[index]); err != nil {
|
|
||||||
return fmt.Errorf("Failed to remove DNS entry for %s due to %w",
|
|
||||||
dnsBuckets[index].Key, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}, index)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, err := range g.Wait() {
|
// Remove buckets that are in DNS for this server, but aren't local
|
||||||
if err != nil {
|
for bucket, records := range dnsBuckets {
|
||||||
logger.LogIf(context.Background(), err)
|
if bucketsSet.Contains(bucket) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(records)...)).IsEmpty() {
|
||||||
|
// This is not for our server, so we can continue
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// We go to here, so we know the bucket no longer exists,
|
||||||
|
// but is registered in DNS to this server
|
||||||
|
if err = globalDNSConfig.Delete(bucket); err != nil {
|
||||||
|
logger.LogIf(context.Background(), fmt.Errorf("Failed to remove DNS entry for %s due to %w",
|
||||||
|
bucket, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -285,16 +279,11 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
|
|||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
bucketSet := set.NewStringSet()
|
for _, dnsRecords := range dnsBuckets {
|
||||||
for _, dnsRecord := range dnsBuckets {
|
|
||||||
if bucketSet.Contains(dnsRecord.Key) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
bucketsInfo = append(bucketsInfo, BucketInfo{
|
bucketsInfo = append(bucketsInfo, BucketInfo{
|
||||||
Name: dnsRecord.Key,
|
Name: dnsRecords[0].Key,
|
||||||
Created: dnsRecord.CreationDate,
|
Created: dnsRecords[0].CreationDate,
|
||||||
})
|
})
|
||||||
bucketSet.Add(dnsRecord.Key)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Invoke the list buckets.
|
// Invoke the list buckets.
|
||||||
|
@ -38,18 +38,18 @@ var ErrNoEntriesFound = errors.New("No entries found for this key")
|
|||||||
const etcdPathSeparator = "/"
|
const etcdPathSeparator = "/"
|
||||||
|
|
||||||
// create a new coredns service record for the bucket.
|
// create a new coredns service record for the bucket.
|
||||||
func newCoreDNSMsg(ip string, port string, ttl uint32) ([]byte, error) {
|
func newCoreDNSMsg(ip string, port string, ttl uint32, t time.Time) ([]byte, error) {
|
||||||
return json.Marshal(&SrvRecord{
|
return json.Marshal(&SrvRecord{
|
||||||
Host: ip,
|
Host: ip,
|
||||||
Port: json.Number(port),
|
Port: json.Number(port),
|
||||||
TTL: ttl,
|
TTL: ttl,
|
||||||
CreationDate: time.Now().UTC(),
|
CreationDate: t,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// List - Retrieves list of DNS entries for the domain.
|
// List - Retrieves list of DNS entries for the domain.
|
||||||
func (c *CoreDNS) List() ([]SrvRecord, error) {
|
func (c *CoreDNS) List() (map[string][]SrvRecord, error) {
|
||||||
var srvRecords []SrvRecord
|
var srvRecords = map[string][]SrvRecord{}
|
||||||
for _, domainName := range c.domainNames {
|
for _, domainName := range c.domainNames {
|
||||||
key := msg.Path(fmt.Sprintf("%s.", domainName), c.prefixPath)
|
key := msg.Path(fmt.Sprintf("%s.", domainName), c.prefixPath)
|
||||||
records, err := c.list(key)
|
records, err := c.list(key)
|
||||||
@ -60,7 +60,11 @@ func (c *CoreDNS) List() ([]SrvRecord, error) {
|
|||||||
if record.Key == "" {
|
if record.Key == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
srvRecords = append(srvRecords, record)
|
if _, ok := srvRecords[record.Key]; ok {
|
||||||
|
srvRecords[record.Key] = append(srvRecords[record.Key], record)
|
||||||
|
} else {
|
||||||
|
srvRecords[record.Key] = []SrvRecord{record}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return srvRecords, nil
|
return srvRecords, nil
|
||||||
@ -150,8 +154,9 @@ func (c *CoreDNS) list(key string) ([]SrvRecord, error) {
|
|||||||
func (c *CoreDNS) Put(bucket string) error {
|
func (c *CoreDNS) Put(bucket string) error {
|
||||||
c.Delete(bucket) // delete any existing entries.
|
c.Delete(bucket) // delete any existing entries.
|
||||||
|
|
||||||
|
t := time.Now().UTC()
|
||||||
for ip := range c.domainIPs {
|
for ip := range c.domainIPs {
|
||||||
bucketMsg, err := newCoreDNSMsg(ip, c.domainPort, defaultTTL)
|
bucketMsg, err := newCoreDNSMsg(ip, c.domainPort, defaultTTL, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,6 @@ import (
|
|||||||
"github.com/klauspost/compress/zip"
|
"github.com/klauspost/compress/zip"
|
||||||
miniogopolicy "github.com/minio/minio-go/v6/pkg/policy"
|
miniogopolicy "github.com/minio/minio-go/v6/pkg/policy"
|
||||||
"github.com/minio/minio-go/v6/pkg/s3utils"
|
"github.com/minio/minio-go/v6/pkg/s3utils"
|
||||||
"github.com/minio/minio-go/v6/pkg/set"
|
|
||||||
"github.com/minio/minio/browser"
|
"github.com/minio/minio/browser"
|
||||||
"github.com/minio/minio/cmd/config/etcd/dns"
|
"github.com/minio/minio/cmd/config/etcd/dns"
|
||||||
"github.com/minio/minio/cmd/config/identity/openid"
|
"github.com/minio/minio/cmd/config/identity/openid"
|
||||||
@ -312,27 +311,20 @@ func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, re
|
|||||||
if err != nil && err != dns.ErrNoEntriesFound {
|
if err != nil && err != dns.ErrNoEntriesFound {
|
||||||
return toJSONError(ctx, err)
|
return toJSONError(ctx, err)
|
||||||
}
|
}
|
||||||
bucketSet := set.NewStringSet()
|
for _, dnsRecords := range dnsBuckets {
|
||||||
for _, dnsRecord := range dnsBuckets {
|
|
||||||
if bucketSet.Contains(dnsRecord.Key) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if globalIAMSys.IsAllowed(iampolicy.Args{
|
if globalIAMSys.IsAllowed(iampolicy.Args{
|
||||||
AccountName: claims.AccessKey,
|
AccountName: claims.AccessKey,
|
||||||
Action: iampolicy.ListBucketAction,
|
Action: iampolicy.ListBucketAction,
|
||||||
BucketName: dnsRecord.Key,
|
BucketName: dnsRecords[0].Key,
|
||||||
ConditionValues: getConditionValues(r, "", claims.AccessKey, claims.Map()),
|
ConditionValues: getConditionValues(r, "", claims.AccessKey, claims.Map()),
|
||||||
IsOwner: owner,
|
IsOwner: owner,
|
||||||
ObjectName: "",
|
ObjectName: "",
|
||||||
Claims: claims.Map(),
|
Claims: claims.Map(),
|
||||||
}) {
|
}) {
|
||||||
reply.Buckets = append(reply.Buckets, WebBucketInfo{
|
reply.Buckets = append(reply.Buckets, WebBucketInfo{
|
||||||
Name: dnsRecord.Key,
|
Name: dnsRecords[0].Key,
|
||||||
CreationDate: dnsRecord.CreationDate,
|
CreationDate: dnsRecords[0].CreationDate,
|
||||||
})
|
})
|
||||||
|
|
||||||
bucketSet.Add(dnsRecord.Key)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user