From 0cde37be501b87ba1c5b3a9dc9b9fa9439450706 Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Fri, 25 Aug 2023 15:59:16 +0100 Subject: [PATCH] Reduce the number of calls to import bucket metadata (#17899) For each bucket, save the bucket metadata once, call the site replication hook once --- cmd/admin-bucket-handlers.go | 191 ++++++++++--------------- cmd/admin-handlers-site-replication.go | 9 +- cmd/bucket-metadata-sys.go | 23 ++- cmd/bucket-metadata.go | 21 ++- cmd/site-replication.go | 69 +++++++++ 5 files changed, 178 insertions(+), 135 deletions(-) diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index 37a1c91ae..acee0e367 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -662,12 +662,31 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL) return } - bucketMap := make(map[string]struct{}, 1) rpt := importMetaReport{ madmin.BucketMetaImportErrs{ Buckets: make(map[string]madmin.BucketStatus, len(zr.File)), }, } + + bucketMap := make(map[string]*BucketMetadata, len(zr.File)) + + updatedAt := UTCNow() + + for _, file := range zr.File { + slc := strings.Split(file.Name, slashSeparator) + if len(slc) != 2 { // expecting bucket/configfile in the zipfile + rpt.SetStatus(file.Name, "", fmt.Errorf("malformed zip - expecting format bucket/")) + continue + } + bucket := slc[0] + meta, err := readBucketMetadata(ctx, objectAPI, bucket) + if err == nil { + bucketMap[bucket] = &meta + } else if err != errConfigNotFound { + rpt.SetStatus(bucket, "", err) + } + } + // import object lock config if any - order of import matters here. for _, file := range zr.File { slc := strings.Split(file.Name, slashSeparator) @@ -704,7 +723,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } } - bucketMap[bucket] = struct{}{} + v := newBucketMetadata(bucket) + bucketMap[bucket] = &v } // Deny object locking configuration settings on existing buckets without object lock enabled. @@ -713,27 +733,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, configData) - if err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } + bucketMap[bucket].ObjectLockConfigXML = configData + bucketMap[bucket].ObjectLockConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) - - // Call site replication hook. - // - // We encode the xml bytes as base64 to ensure there are no encoding - // errors. - cfgStr := base64.StdEncoding.EncodeToString(configData) - if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ - Type: madmin.SRBucketMetaTypeObjectLockConfig, - Bucket: bucket, - ObjectLockConfig: &cfgStr, - UpdatedAt: updatedAt, - }); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } } } @@ -763,7 +765,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } } - bucketMap[bucket] = struct{}{} + v := newBucketMetadata(bucket) + bucketMap[bucket] = &v } if globalSiteReplicationSys.isEnabled() && v.Suspended() { @@ -786,10 +789,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } + bucketMap[bucket].VersioningConfigXML = configData + bucketMap[bucket].VersioningConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) } } @@ -807,6 +808,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } bucket, fileName := slc[0], slc[1] + // create bucket if it does not exist yet. if _, ok := bucketMap[bucket]; !ok { err = objectAPI.MakeBucket(ctx, bucket, MakeBucketOptions{}) @@ -816,7 +818,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } } - bucketMap[bucket] = struct{}{} + v := newBucketMetadata(bucket) + bucketMap[bucket] = &v } if _, ok := bucketMap[bucket]; !ok { continue @@ -835,12 +838,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketNotificationConfig, configData); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } - rulesMap := config.ToRulesMap() - globalEventNotifier.AddRulesMap(bucket, rulesMap) + bucketMap[bucket].NotificationConfigXML = configData rpt.SetStatus(bucket, fileName, nil) case bucketPolicyConfig: // Error out if Content-Length is beyond allowed size. @@ -873,22 +871,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, configData) - if err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } + bucketMap[bucket].PolicyConfigJSON = configData + bucketMap[bucket].PolicyConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) - // Call site replication hook. - if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ - Type: madmin.SRBucketMetaTypePolicy, - Bucket: bucket, - Policy: bucketPolicyBytes, - UpdatedAt: updatedAt, - }); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } case bucketLifecycleConfig: bucketLifecycle, err := lifecycle.ParseLifecycleConfig(io.LimitReader(reader, sz)) if err != nil { @@ -914,10 +899,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketLifecycleConfig, configData); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } + bucketMap[bucket].LifecycleConfigXML = configData + bucketMap[bucket].LifecycleConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) case bucketSSEConfig: // Parse bucket encryption xml @@ -952,29 +935,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - // Store the bucket encryption configuration in the object layer - updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, configData) - if err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } + bucketMap[bucket].EncryptionConfigXML = configData + bucketMap[bucket].EncryptionConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) - - // Call site replication hook. - // - // We encode the xml bytes as base64 to ensure there are no encoding - // errors. - cfgStr := base64.StdEncoding.EncodeToString(configData) - if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ - Type: madmin.SRBucketMetaTypeSSEConfig, - Bucket: bucket, - SSEConfig: &cfgStr, - UpdatedAt: updatedAt, - }); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } - case bucketTaggingConfig: tags, err := tags.ParseBucketXML(io.LimitReader(reader, sz)) if err != nil { @@ -988,27 +951,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, configData) - if err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } + bucketMap[bucket].TaggingConfigXML = configData + bucketMap[bucket].TaggingConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) - - // Call site replication hook. - // - // We encode the xml bytes as base64 to ensure there are no encoding - // errors. - cfgStr := base64.StdEncoding.EncodeToString(configData) - if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ - Type: madmin.SRBucketMetaTypeTags, - Bucket: bucket, - Tags: &cfgStr, - UpdatedAt: updatedAt, - }); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } case bucketQuotaConfigFile: data, err := io.ReadAll(reader) if err != nil { @@ -1016,37 +961,49 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } - quotaConfig, err := parseBucketQuota(bucket, data) + _, err = parseBucketQuota(bucket, data) if err != nil { rpt.SetStatus(bucket, fileName, err) continue } - updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, data) - if err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } + bucketMap[bucket].QuotaConfigJSON = data + bucketMap[bucket].QuotaConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) - - bucketMeta := madmin.SRBucketMeta{ - Type: madmin.SRBucketMetaTypeQuotaConfig, - Bucket: bucket, - Quota: data, - UpdatedAt: updatedAt, - } - if quotaConfig.Quota == 0 { - bucketMeta.Quota = nil - } - - // Call site replication hook. - if err = globalSiteReplicationSys.BucketMetaHook(ctx, bucketMeta); err != nil { - rpt.SetStatus(bucket, fileName, err) - continue - } } } + enc := func(b []byte) *string { + if b == nil { + return nil + } + v := base64.StdEncoding.EncodeToString(b) + return &v + } + + for bucket, meta := range bucketMap { + err := globalBucketMetadataSys.save(ctx, *meta) + if err != nil { + rpt.SetStatus(bucket, "", err) + continue + } + // Call site replication hook. + if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Bucket: bucket, + Quota: meta.QuotaConfigJSON, + Policy: meta.PolicyConfigJSON, + Versioning: enc(meta.VersioningConfigXML), + Tags: enc(meta.TaggingConfigXML), + ObjectLockConfig: enc(meta.ObjectLockConfigXML), + SSEConfig: enc(meta.EncryptionConfigXML), + UpdatedAt: updatedAt, + }); err != nil { + rpt.SetStatus(bucket, "", err) + continue + } + + } + rptData, err := json.Marshal(rpt.BucketMetaImportErrs) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index d4c5b39b4..9d07453ba 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -207,10 +207,15 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht return } + if item.Bucket == "" { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errSRInvalidRequest(errInvalidArgument)), r.URL) + return + } + var err error switch item.Type { default: - err = errSRInvalidRequest(errInvalidArgument) + err = globalSiteReplicationSys.PeerBucketMetadataUpdateHandler(ctx, item) case madmin.SRBucketMetaTypePolicy: if item.Policy == nil { err = globalSiteReplicationSys.PeerBucketPolicyHandler(ctx, item.Bucket, nil, item.UpdatedAt) @@ -236,7 +241,7 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht return } if err = globalSiteReplicationSys.PeerBucketQuotaConfigHandler(ctx, item.Bucket, quotaConfig, item.UpdatedAt); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } } diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 1ea417525..9e7a3a373 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -151,14 +151,27 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string, return updatedAt, fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile) } - if err := meta.Save(ctx, objAPI); err != nil { - return updatedAt, err + err = sys.save(ctx, meta) + return updatedAt, err +} + +func (sys *BucketMetadataSys) save(ctx context.Context, meta BucketMetadata) error { + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized } - sys.Set(bucket, meta) - globalNotificationSys.LoadBucketMetadata(bgContext(ctx), bucket) // Do not use caller context here + if isMinioMetaBucketName(meta.Name) { + return errInvalidArgument + } - return updatedAt, nil + if err := meta.Save(ctx, objAPI); err != nil { + return err + } + + sys.Set(meta.Name, meta) + globalNotificationSys.LoadBucketMetadata(bgContext(ctx), meta.Name) // Do not use caller context here + return nil } // Delete delete the bucket metadata for the specified bucket. diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index ce1d7edf9..c47d954c0 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -143,39 +143,38 @@ func (b *BucketMetadata) SetCreatedAt(createdAt time.Time) { // Load - loads the metadata of bucket by name from ObjectLayer api. // If an error is returned the returned metadata will be default initialized. -func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) error { +func readBucketMetadata(ctx context.Context, api ObjectLayer, name string) (BucketMetadata, error) { if name == "" { logger.LogIf(ctx, errors.New("bucket name cannot be empty")) - return errInvalidArgument + return BucketMetadata{}, errInvalidArgument } + b := newBucketMetadata(name) configFile := path.Join(bucketMetaPrefix, name, bucketMetadataFile) data, err := readConfig(ctx, api, configFile) if err != nil { - return err + return b, err } if len(data) <= 4 { - return fmt.Errorf("loadBucketMetadata: no data") + return b, fmt.Errorf("loadBucketMetadata: no data") } // Read header switch binary.LittleEndian.Uint16(data[0:2]) { case bucketMetadataFormat: default: - return fmt.Errorf("loadBucketMetadata: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + return b, fmt.Errorf("loadBucketMetadata: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) } switch binary.LittleEndian.Uint16(data[2:4]) { case bucketMetadataVersion: default: - return fmt.Errorf("loadBucketMetadata: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + return b, fmt.Errorf("loadBucketMetadata: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) } - // OK, parse data. _, err = b.UnmarshalMsg(data[4:]) - b.Name = name // in-case parsing failed for some reason, make sure bucket name is not empty. - return err + return b, err } func loadBucketMetadataParse(ctx context.Context, objectAPI ObjectLayer, bucket string, parse bool) (BucketMetadata, error) { - b := newBucketMetadata(bucket) - err := b.Load(ctx, objectAPI, b.Name) + b, err := readBucketMetadata(ctx, objectAPI, bucket) + b.Name = bucket // in-case parsing failed for some reason, make sure bucket name is not empty. if err != nil && !errors.Is(err, errConfigNotFound) { return b, err } diff --git a/cmd/site-replication.go b/cmd/site-replication.go index d102fb58e..59af6526a 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -1456,6 +1456,75 @@ func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bu return nil } +// PeerBucketMetadataUpdateHandler - merges the bucket metadata, save and ping other nodes +func (c *SiteReplicationSys) PeerBucketMetadataUpdateHandler(ctx context.Context, item madmin.SRBucketMeta) error { + objectAPI := newObjectLayerFn() + if objectAPI == nil { + return errSRObjectLayerNotReady + } + + if item.Bucket == "" || item.UpdatedAt.IsZero() { + return wrapSRErr(errInvalidArgument) + } + + meta, err := readBucketMetadata(ctx, objectAPI, item.Bucket) + if err != nil { + return wrapSRErr(err) + } + + if meta.Created.After(item.UpdatedAt) { + return nil + } + + if item.Policy != nil { + meta.PolicyConfigJSON = item.Policy + meta.PolicyConfigUpdatedAt = item.UpdatedAt + } + + if item.Versioning != nil { + configData, err := base64.StdEncoding.DecodeString(*item.Versioning) + if err != nil { + return wrapSRErr(err) + } + meta.VersioningConfigXML = configData + meta.VersioningConfigUpdatedAt = item.UpdatedAt + } + + if item.Tags != nil { + configData, err := base64.StdEncoding.DecodeString(*item.Tags) + if err != nil { + return wrapSRErr(err) + } + meta.TaggingConfigXML = configData + meta.TaggingConfigUpdatedAt = item.UpdatedAt + } + + if item.ObjectLockConfig != nil { + configData, err := base64.StdEncoding.DecodeString(*item.ObjectLockConfig) + if err != nil { + return wrapSRErr(err) + } + meta.ObjectLockConfigXML = configData + meta.ObjectLockConfigUpdatedAt = item.UpdatedAt + } + + if item.SSEConfig != nil { + configData, err := base64.StdEncoding.DecodeString(*item.SSEConfig) + if err != nil { + return wrapSRErr(err) + } + meta.EncryptionConfigXML = configData + meta.EncryptionConfigUpdatedAt = item.UpdatedAt + } + + if item.Quota != nil { + meta.QuotaConfigJSON = item.Quota + meta.QuotaConfigUpdatedAt = item.UpdatedAt + } + + return globalBucketMetadataSys.save(ctx, meta) +} + // PeerBucketPolicyHandler - copies/deletes policy to local cluster. func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *bktpolicy.Policy, updatedAt time.Time) error { // skip overwrite if local update is newer than peer update.