From 9bbf5cb74f5536b6baf4e709470e08b33910746f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 3 Feb 2020 13:54:20 +0530 Subject: [PATCH] fix: Avoid re-reading bucket names from etcd (#8924) This helps improve performance when there are 1000+ bucket entries on etcd, improves the startup time significantly. --- cmd/bucket-handlers.go | 125 +++++++++++++++----------------- cmd/config/etcd/dns/etcd_dns.go | 17 +++-- cmd/web-handlers.go | 16 +--- 3 files changed, 72 insertions(+), 86 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 5bfc7d50b..1790822ca 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2015, 2016, 2017, 2018 MinIO, Inc. + * MinIO Cloud Storage, (C) 2015-2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import ( "encoding/xml" "fmt" "io" - "net" "net/http" "net/url" "path" @@ -75,40 +74,45 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { return } - bucketSet := set.NewStringSet() + bucketsSet := set.NewStringSet() + bucketsToBeUpdated := set.NewStringSet() + bucketsInConflict := set.NewStringSet() + for _, bucket := range buckets { + bucketsSet.Add(bucket.Name) + r, ok := dnsBuckets[bucket.Name] + if !ok { + bucketsToBeUpdated.Add(bucket.Name) + continue + } + if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { + if globalDomainIPs.Difference(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { + // No difference in terms of domainIPs and nothing + // has changed so we don't change anything on the etcd. + continue + } + // if domain IPs intersect then it won't be an empty set. + // such an intersection means that bucket exists on etcd. + // but if we do see a difference with local domain IPs with + // hostSlice from etcd then we should update with newer + // domainIPs, we proceed to do that here. + bucketsToBeUpdated.Add(bucket.Name) + continue + } + // No IPs seem to intersect, this means that bucket exists but has + // different IP addresses perhaps from a different deployment. + // bucket names are globally unique in federation at a given + // path prefix, name collision is not allowed. We simply log + // an error and continue. + bucketsInConflict.Add(bucket.Name) + } - // Add buckets that are not registered with the DNS + // Add/update buckets that are not registered with the DNS g := errgroup.WithNErrs(len(buckets)) - for index := range buckets { - bucketSet.Add(buckets[index].Name) + bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice() + for index := range bucketsToBeUpdatedSlice { index := index g.Go(func() error { - r, gerr := globalDNSConfig.Get(buckets[index].Name) - if gerr != nil { - if gerr == dns.ErrNoEntriesFound { - return globalDNSConfig.Put(buckets[index].Name) - } - return gerr - } - if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { - if globalDomainIPs.Difference(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { - // No difference in terms of domainIPs and nothing - // has changed so we don't change anything on the etcd. - return nil - } - // if domain IPs intersect then it won't be an empty set. - // such an intersection means that bucket exists on etcd. - // but if we do see a difference with local domain IPs with - // hostSlice from etcd then we should update with newer - // domainIPs, we proceed to do that here. - return globalDNSConfig.Put(buckets[index].Name) - } - // No IPs seem to intersect, this means that bucket exists but has - // different IP addresses perhaps from a different deployment. - // bucket names are globally unique in federation at a given - // path prefix, name collision is not allowed. We simply log - // an error and continue. - return fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket. Use one of these IP addresses %v to access the bucket", buckets[index].Name, globalDomainIPs.ToSlice()) + return globalDNSConfig.Put(bucketsToBeUpdatedSlice[index]) }, index) } @@ -118,36 +122,26 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { } } - g = errgroup.WithNErrs(len(dnsBuckets)) - // Remove buckets that are in DNS for this server, but aren't local - for index := range dnsBuckets { - index := index - g.Go(func() error { - // This is a local bucket that exists, so we can continue - if bucketSet.Contains(dnsBuckets[index].Key) { - return nil - } - - // This is not for our server, so we can continue - hostPort := net.JoinHostPort(dnsBuckets[index].Host, string(dnsBuckets[index].Port)) - if globalDomainIPs.Intersection(set.CreateStringSet(hostPort)).IsEmpty() { - return nil - } - - // We go to here, so we know the bucket no longer exists, - // but is registered in DNS to this server - if err := globalDNSConfig.DeleteRecord(dnsBuckets[index]); err != nil { - return fmt.Errorf("Failed to remove DNS entry for %s due to %w", - dnsBuckets[index].Key, err) - } - - return nil - }, index) + for _, bucket := range bucketsInConflict.ToSlice() { + logger.LogIf(context.Background(), fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket. Use one of these IP addresses %v to access the bucket", bucket, globalDomainIPs.ToSlice())) } - for _, err := range g.Wait() { - if err != nil { - logger.LogIf(context.Background(), err) + // Remove buckets that are in DNS for this server, but aren't local + for bucket, records := range dnsBuckets { + if bucketsSet.Contains(bucket) { + continue + } + + if globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(records)...)).IsEmpty() { + // This is not for our server, so we can continue + continue + } + + // We go to here, so we know the bucket no longer exists, + // but is registered in DNS to this server + if err = globalDNSConfig.Delete(bucket); err != nil { + logger.LogIf(context.Background(), fmt.Errorf("Failed to remove DNS entry for %s due to %w", + bucket, err)) } } } @@ -285,16 +279,11 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - bucketSet := set.NewStringSet() - for _, dnsRecord := range dnsBuckets { - if bucketSet.Contains(dnsRecord.Key) { - continue - } + for _, dnsRecords := range dnsBuckets { bucketsInfo = append(bucketsInfo, BucketInfo{ - Name: dnsRecord.Key, - Created: dnsRecord.CreationDate, + Name: dnsRecords[0].Key, + Created: dnsRecords[0].CreationDate, }) - bucketSet.Add(dnsRecord.Key) } } else { // Invoke the list buckets. diff --git a/cmd/config/etcd/dns/etcd_dns.go b/cmd/config/etcd/dns/etcd_dns.go index 58ee48fc3..fc1195b18 100644 --- a/cmd/config/etcd/dns/etcd_dns.go +++ b/cmd/config/etcd/dns/etcd_dns.go @@ -38,18 +38,18 @@ var ErrNoEntriesFound = errors.New("No entries found for this key") const etcdPathSeparator = "/" // create a new coredns service record for the bucket. -func newCoreDNSMsg(ip string, port string, ttl uint32) ([]byte, error) { +func newCoreDNSMsg(ip string, port string, ttl uint32, t time.Time) ([]byte, error) { return json.Marshal(&SrvRecord{ Host: ip, Port: json.Number(port), TTL: ttl, - CreationDate: time.Now().UTC(), + CreationDate: t, }) } // List - Retrieves list of DNS entries for the domain. -func (c *CoreDNS) List() ([]SrvRecord, error) { - var srvRecords []SrvRecord +func (c *CoreDNS) List() (map[string][]SrvRecord, error) { + var srvRecords = map[string][]SrvRecord{} for _, domainName := range c.domainNames { key := msg.Path(fmt.Sprintf("%s.", domainName), c.prefixPath) records, err := c.list(key) @@ -60,7 +60,11 @@ func (c *CoreDNS) List() ([]SrvRecord, error) { if record.Key == "" { continue } - srvRecords = append(srvRecords, record) + if _, ok := srvRecords[record.Key]; ok { + srvRecords[record.Key] = append(srvRecords[record.Key], record) + } else { + srvRecords[record.Key] = []SrvRecord{record} + } } } return srvRecords, nil @@ -150,8 +154,9 @@ func (c *CoreDNS) list(key string) ([]SrvRecord, error) { func (c *CoreDNS) Put(bucket string) error { c.Delete(bucket) // delete any existing entries. + t := time.Now().UTC() for ip := range c.domainIPs { - bucketMsg, err := newCoreDNSMsg(ip, c.domainPort, defaultTTL) + bucketMsg, err := newCoreDNSMsg(ip, c.domainPort, defaultTTL, t) if err != nil { return err } diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index d98bd5250..9275bffc1 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -37,7 +37,6 @@ import ( "github.com/klauspost/compress/zip" miniogopolicy "github.com/minio/minio-go/v6/pkg/policy" "github.com/minio/minio-go/v6/pkg/s3utils" - "github.com/minio/minio-go/v6/pkg/set" "github.com/minio/minio/browser" "github.com/minio/minio/cmd/config/etcd/dns" "github.com/minio/minio/cmd/config/identity/openid" @@ -312,27 +311,20 @@ func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, re if err != nil && err != dns.ErrNoEntriesFound { return toJSONError(ctx, err) } - bucketSet := set.NewStringSet() - for _, dnsRecord := range dnsBuckets { - if bucketSet.Contains(dnsRecord.Key) { - continue - } - + for _, dnsRecords := range dnsBuckets { if globalIAMSys.IsAllowed(iampolicy.Args{ AccountName: claims.AccessKey, Action: iampolicy.ListBucketAction, - BucketName: dnsRecord.Key, + BucketName: dnsRecords[0].Key, ConditionValues: getConditionValues(r, "", claims.AccessKey, claims.Map()), IsOwner: owner, ObjectName: "", Claims: claims.Map(), }) { reply.Buckets = append(reply.Buckets, WebBucketInfo{ - Name: dnsRecord.Key, - CreationDate: dnsRecord.CreationDate, + Name: dnsRecords[0].Key, + CreationDate: dnsRecords[0].CreationDate, }) - - bucketSet.Add(dnsRecord.Key) } } } else {