From 8b80eca184ca43f3538d562bbe6f04cd8cbfa8f6 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 1 Oct 2019 17:05:02 -0700 Subject: [PATCH] List buckets only once per sub-system initialization (#8333) Current master repeatedly calls ListBuckets() during initialization of multiple sub-systems Use single ListBuckets() call for each sub-system as follows - LifeCycle - Policy - Notification --- cmd/admin-handlers_test.go | 9 +++-- cmd/gateway-main.go | 8 +---- cmd/gateway-unsupported.go | 5 --- cmd/lifecycle.go | 69 +++++++++++++------------------------- cmd/notification.go | 16 +++++---- cmd/policy.go | 32 +++--------------- cmd/server-main.go | 18 ++++++---- cmd/test-utils_test.go | 18 +++++++--- 8 files changed, 71 insertions(+), 104 deletions(-) diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 82eaa9d7f..be1f91130 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -294,11 +294,16 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) { globalIAMSys = NewIAMSys() globalIAMSys.Init(objLayer) + buckets, err := objLayer.ListBuckets(context.Background()) + if err != nil { + return nil, err + } + globalPolicySys = NewPolicySys() - globalPolicySys.Init(objLayer) + globalPolicySys.Init(buckets, objLayer) globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) - globalNotificationSys.Init(objLayer) + globalNotificationSys.Init(buckets, objLayer) // Setup admin mgmt REST API handlers. adminRouter := mux.NewRouter() diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 7fc0c8ca6..260495f61 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -274,17 +274,11 @@ func StartGateway(ctx *cli.Context, gw Gateway) { // Create new policy system. globalPolicySys = NewPolicySys() - // Initialize policy system. - go globalPolicySys.Init(newObject) - - // Create new lifecycle system + // Create new lifecycle system. globalLifecycleSys = NewLifecycleSys() // Create new notification system. globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) - if enableConfigOps && newObject.IsNotificationSupported() { - logger.LogIf(context.Background(), globalNotificationSys.Init(newObject)) - } // Verify if object layer supports // - encryption diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 246d7199f..6a3ffce77 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -144,11 +144,6 @@ func (a GatewayUnsupported) CopyObject(ctx context.Context, srcBucket string, sr return objInfo, NotImplemented{} } -// RefreshBucketPolicy refreshes cache policy with what's on disk. -func (a GatewayUnsupported) RefreshBucketPolicy(ctx context.Context, bucket string) error { - return NotImplemented{} -} - // IsNotificationSupported returns whether bucket notification is applicable for this layer. func (a GatewayUnsupported) IsNotificationSupported() bool { return false diff --git a/cmd/lifecycle.go b/cmd/lifecycle.go index 579c33f25..938520325 100644 --- a/cmd/lifecycle.go +++ b/cmd/lifecycle.go @@ -24,9 +24,7 @@ import ( "path" "strings" "sync" - "time" - "github.com/minio/minio-go/pkg/set" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/lifecycle" ) @@ -45,6 +43,11 @@ type LifecycleSys struct { // Set - sets lifecycle config to given bucket name. func (sys *LifecycleSys) Set(bucketName string, lifecycle lifecycle.Lifecycle) { + if globalIsGateway { + // no-op + return + } + sys.Lock() defer sys.Unlock() @@ -53,6 +56,16 @@ func (sys *LifecycleSys) Set(bucketName string, lifecycle lifecycle.Lifecycle) { // Get - gets lifecycle config associated to a given bucket name. func (sys *LifecycleSys) Get(bucketName string) (lifecycle lifecycle.Lifecycle, ok bool) { + if globalIsGateway { + // When gateway is enabled, no cached value + // is used to validate life cycle policies. + objAPI := newObjectLayerFn() + if objAPI == nil { + return + } + l, err := objAPI.GetBucketLifecycle(context.Background(), bucketName) + return *l, err == nil + } sys.Lock() defer sys.Unlock() @@ -107,26 +120,16 @@ func NewLifecycleSys() *LifecycleSys { } // Init - initializes lifecycle system from lifecycle.xml of all buckets. -func (sys *LifecycleSys) Init(objAPI ObjectLayer) error { +func (sys *LifecycleSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error { if objAPI == nil { return errServerNotInitialized } - defer func() { - // Refresh LifecycleSys in background. - go func() { - ticker := time.NewTicker(globalRefreshBucketLifecycleInterval) - defer ticker.Stop() - for { - select { - case <-GlobalServiceDoneCh: - return - case <-ticker.C: - sys.refresh(objAPI) - } - } - }() - }() + // In gateway mode, we always fetch the bucket lifecycle configuration from the gateway backend. + // So, this is a no-op for gateway servers. + if globalIsGateway { + return nil + } doneCh := make(chan struct{}) defer close(doneCh) @@ -140,7 +143,7 @@ func (sys *LifecycleSys) Init(objAPI ObjectLayer) error { select { case <-retryTimerCh: // Load LifecycleSys once during boot. - if err := sys.refresh(objAPI); err != nil { + if err := sys.load(buckets, objAPI); err != nil { if err == errDiskNotFound || strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { @@ -156,14 +159,8 @@ func (sys *LifecycleSys) Init(objAPI ObjectLayer) error { } } -// Refresh LifecycleSys. -func (sys *LifecycleSys) refresh(objAPI ObjectLayer) error { - buckets, err := objAPI.ListBuckets(context.Background()) - if err != nil { - logger.LogIf(context.Background(), err) - return err - } - sys.removeDeletedBuckets(buckets) +// Loads lifecycle policies for all buckets into LifecycleSys. +func (sys *LifecycleSys) load(buckets []BucketInfo, objAPI ObjectLayer) error { for _, bucket := range buckets { config, err := objAPI.GetBucketLifecycle(context.Background(), bucket.Name) if err != nil { @@ -179,24 +176,6 @@ func (sys *LifecycleSys) refresh(objAPI ObjectLayer) error { return nil } -// removeDeletedBuckets - to handle a corner case where we have cached the lifecycle for a deleted -// bucket. i.e if we miss a delete-bucket notification we should delete the corresponding -// bucket policy during sys.refresh() -func (sys *LifecycleSys) removeDeletedBuckets(bucketInfos []BucketInfo) { - buckets := set.NewStringSet() - for _, info := range bucketInfos { - buckets.Add(info.Name) - } - sys.Lock() - defer sys.Unlock() - - for bucket := range sys.bucketLifecycleMap { - if !buckets.Contains(bucket) { - delete(sys.bucketLifecycleMap, bucket) - } - } -} - // Remove - removes policy for given bucket name. func (sys *LifecycleSys) Remove(bucketName string) { sys.Lock() diff --git a/cmd/notification.go b/cmd/notification.go index 5eecbe3a4..0255bde24 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -770,11 +770,8 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye return nil } -func (sys *NotificationSys) refresh(objAPI ObjectLayer) error { - buckets, err := objAPI.ListBuckets(context.Background()) - if err != nil { - return err - } +// Loads notification policies for all buckets into NotificationSys. +func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error { for _, bucket := range buckets { ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name}) config, err := readNotificationConfig(ctx, objAPI, bucket.Name) @@ -796,11 +793,16 @@ func (sys *NotificationSys) refresh(objAPI ObjectLayer) error { } // Init - initializes notification system from notification.xml and listener.json of all buckets. -func (sys *NotificationSys) Init(objAPI ObjectLayer) error { +func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error { if objAPI == nil { return errInvalidArgument } + // In gateway mode, notifications are not supported. + if globalIsGateway { + return nil + } + doneCh := make(chan struct{}) defer close(doneCh) @@ -812,7 +814,7 @@ func (sys *NotificationSys) Init(objAPI ObjectLayer) error { for { select { case <-retryTimerCh: - if err := sys.refresh(objAPI); err != nil { + if err := sys.load(buckets, objAPI); err != nil { if err == errDiskNotFound || strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { diff --git a/cmd/policy.go b/cmd/policy.go index 35221b532..be30955b8 100644 --- a/cmd/policy.go +++ b/cmd/policy.go @@ -27,7 +27,6 @@ import ( "sync" miniogopolicy "github.com/minio/minio-go/v6/pkg/policy" - "github.com/minio/minio-go/v6/pkg/set" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/handlers" @@ -40,24 +39,6 @@ type PolicySys struct { bucketPolicyMap map[string]policy.Policy } -// removeDeletedBuckets - to handle a corner case where we have cached the policy for a deleted -// bucket. i.e if we miss a delete-bucket notification we should delete the corresponding -// bucket policy during sys.refresh() -func (sys *PolicySys) removeDeletedBuckets(bucketInfos []BucketInfo) { - buckets := set.NewStringSet() - for _, info := range bucketInfos { - buckets.Add(info.Name) - } - sys.Lock() - defer sys.Unlock() - - for bucket := range sys.bucketPolicyMap { - if !buckets.Contains(bucket) { - delete(sys.bucketPolicyMap, bucket) - } - } -} - // Set - sets policy to given bucket name. If policy is empty, existing policy is removed. func (sys *PolicySys) Set(bucketName string, policy policy.Policy) { if globalIsGateway { @@ -110,13 +91,8 @@ func (sys *PolicySys) IsAllowed(args policy.Args) bool { return args.IsOwner } -// Refresh PolicySys. -func (sys *PolicySys) refresh(objAPI ObjectLayer) error { - buckets, err := objAPI.ListBuckets(context.Background()) - if err != nil { - return err - } - sys.removeDeletedBuckets(buckets) +// Loads policies for all buckets into PolicySys. +func (sys *PolicySys) load(buckets []BucketInfo, objAPI ObjectLayer) error { for _, bucket := range buckets { config, err := objAPI.GetBucketPolicy(context.Background(), bucket.Name) if err != nil { @@ -145,7 +121,7 @@ func (sys *PolicySys) refresh(objAPI ObjectLayer) error { } // Init - initializes policy system from policy.json of all buckets. -func (sys *PolicySys) Init(objAPI ObjectLayer) error { +func (sys *PolicySys) Init(buckets []BucketInfo, objAPI ObjectLayer) error { if objAPI == nil { return errInvalidArgument } @@ -168,7 +144,7 @@ func (sys *PolicySys) Init(objAPI ObjectLayer) error { select { case <-retryTimerCh: // Load PolicySys once during boot. - if err := sys.refresh(objAPI); err != nil { + if err := sys.load(buckets, objAPI); err != nil { if err == errDiskNotFound || strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { diff --git a/cmd/server-main.go b/cmd/server-main.go index 78fa639ad..b6facdf78 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -234,6 +234,10 @@ func serverMain(ctx *cli.Context) { checkUpdate(getMinioMode()) } + if globalIsDiskCacheEnabled { + logger.StartupMessage(colorRed(colorBold("Disk caching is allowed only for gateway deployments"))) + } + // FIXME: This code should be removed in future releases and we should have mandatory // check for ENVs credentials under distributed setup. Until all users migrate we // are intentionally providing backward compatibility. @@ -339,22 +343,24 @@ func serverMain(ctx *cli.Context) { logger.Fatal(err, "Unable to initialize IAM system") } + buckets, err := newObject.ListBuckets(context.Background()) + if err != nil { + logger.Fatal(err, "Unable to list buckets on your backend") + } + // Create new policy system. globalPolicySys = NewPolicySys() // Initialize policy system. - if err = globalPolicySys.Init(newObject); err != nil { + if err = globalPolicySys.Init(buckets, newObject); err != nil { logger.Fatal(err, "Unable to initialize policy system") } - if globalIsDiskCacheEnabled { - logger.StartupMessage(colorRed(colorBold("Disk caching is allowed only for gateway deployments"))) - } // Create new lifecycle system. globalLifecycleSys = NewLifecycleSys() // Initialize lifecycle system. - if err = globalLifecycleSys.Init(newObject); err != nil { + if err = globalLifecycleSys.Init(buckets, newObject); err != nil { logger.Fatal(err, "Unable to initialize lifecycle system") } @@ -362,7 +368,7 @@ func serverMain(ctx *cli.Context) { globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) // Initialize notification system. - if err = globalNotificationSys.Init(newObject); err != nil { + if err = globalNotificationSys.Init(buckets, newObject); err != nil { logger.Fatal(err, "Unable to initialize notification system") } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 0d32528aa..1e9bf3735 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -359,14 +359,19 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer { globalIAMSys = NewIAMSys() globalIAMSys.Init(objLayer) + buckets, err := objLayer.ListBuckets(context.Background()) + if err != nil { + t.Fatalf("Unable to list buckets on backend %s", err) + } + globalPolicySys = NewPolicySys() - globalPolicySys.Init(objLayer) + globalPolicySys.Init(buckets, objLayer) globalNotificationSys = NewNotificationSys(globalServerConfig, testServer.Disks) - globalNotificationSys.Init(objLayer) + globalNotificationSys.Init(buckets, objLayer) globalLifecycleSys = NewLifecycleSys() - globalLifecycleSys.Init(objLayer) + globalLifecycleSys.Init(buckets, objLayer) return testServer } @@ -1943,8 +1948,13 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [ globalIAMSys = NewIAMSys() globalIAMSys.Init(objLayer) + buckets, err := objLayer.ListBuckets(context.Background()) + if err != nil { + t.Fatalf("Unable to list buckets on backend %s", err) + } + globalPolicySys = NewPolicySys() - globalPolicySys.Init(objLayer) + globalPolicySys.Init(buckets, objLayer) credentials := globalServerConfig.GetCredential()