Reduce the number of calls to import bucket metadata (#17899)

For each bucket, save the bucket metadata 
once, call the site replication hook once
This commit is contained in:
Anis Eleuch 2023-08-25 15:59:16 +01:00 committed by GitHub
parent 6aeca54ece
commit 0cde37be50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 178 additions and 135 deletions

View File

@ -662,12 +662,31 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
return
}
bucketMap := make(map[string]struct{}, 1)
rpt := importMetaReport{
madmin.BucketMetaImportErrs{
Buckets: make(map[string]madmin.BucketStatus, len(zr.File)),
},
}
bucketMap := make(map[string]*BucketMetadata, len(zr.File))
updatedAt := UTCNow()
for _, file := range zr.File {
slc := strings.Split(file.Name, slashSeparator)
if len(slc) != 2 { // expecting bucket/configfile in the zipfile
rpt.SetStatus(file.Name, "", fmt.Errorf("malformed zip - expecting format bucket/<config.json>"))
continue
}
bucket := slc[0]
meta, err := readBucketMetadata(ctx, objectAPI, bucket)
if err == nil {
bucketMap[bucket] = &meta
} else if err != errConfigNotFound {
rpt.SetStatus(bucket, "", err)
}
}
// import object lock config if any - order of import matters here.
for _, file := range zr.File {
slc := strings.Split(file.Name, slashSeparator)
@ -704,7 +723,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
}
bucketMap[bucket] = struct{}{}
v := newBucketMetadata(bucket)
bucketMap[bucket] = &v
}
// Deny object locking configuration settings on existing buckets without object lock enabled.
@ -713,27 +733,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, configData)
if err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
bucketMap[bucket].ObjectLockConfigXML = configData
bucketMap[bucket].ObjectLockConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
// Call site replication hook.
//
// We encode the xml bytes as base64 to ensure there are no encoding
// errors.
cfgStr := base64.StdEncoding.EncodeToString(configData)
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeObjectLockConfig,
Bucket: bucket,
ObjectLockConfig: &cfgStr,
UpdatedAt: updatedAt,
}); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
}
}
@ -763,7 +765,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
}
bucketMap[bucket] = struct{}{}
v := newBucketMetadata(bucket)
bucketMap[bucket] = &v
}
if globalSiteReplicationSys.isEnabled() && v.Suspended() {
@ -786,10 +789,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
bucketMap[bucket].VersioningConfigXML = configData
bucketMap[bucket].VersioningConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
}
}
@ -807,6 +808,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
bucket, fileName := slc[0], slc[1]
// create bucket if it does not exist yet.
if _, ok := bucketMap[bucket]; !ok {
err = objectAPI.MakeBucket(ctx, bucket, MakeBucketOptions{})
@ -816,7 +818,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
}
bucketMap[bucket] = struct{}{}
v := newBucketMetadata(bucket)
bucketMap[bucket] = &v
}
if _, ok := bucketMap[bucket]; !ok {
continue
@ -835,12 +838,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketNotificationConfig, configData); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
rulesMap := config.ToRulesMap()
globalEventNotifier.AddRulesMap(bucket, rulesMap)
bucketMap[bucket].NotificationConfigXML = configData
rpt.SetStatus(bucket, fileName, nil)
case bucketPolicyConfig:
// Error out if Content-Length is beyond allowed size.
@ -873,22 +871,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, configData)
if err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
bucketMap[bucket].PolicyConfigJSON = configData
bucketMap[bucket].PolicyConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
// Call site replication hook.
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
Policy: bucketPolicyBytes,
UpdatedAt: updatedAt,
}); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
case bucketLifecycleConfig:
bucketLifecycle, err := lifecycle.ParseLifecycleConfig(io.LimitReader(reader, sz))
if err != nil {
@ -914,10 +899,8 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketLifecycleConfig, configData); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
bucketMap[bucket].LifecycleConfigXML = configData
bucketMap[bucket].LifecycleConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
case bucketSSEConfig:
// Parse bucket encryption xml
@ -952,29 +935,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
// Store the bucket encryption configuration in the object layer
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, configData)
if err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
bucketMap[bucket].EncryptionConfigXML = configData
bucketMap[bucket].EncryptionConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
// Call site replication hook.
//
// We encode the xml bytes as base64 to ensure there are no encoding
// errors.
cfgStr := base64.StdEncoding.EncodeToString(configData)
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeSSEConfig,
Bucket: bucket,
SSEConfig: &cfgStr,
UpdatedAt: updatedAt,
}); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
case bucketTaggingConfig:
tags, err := tags.ParseBucketXML(io.LimitReader(reader, sz))
if err != nil {
@ -988,27 +951,9 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, configData)
if err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
bucketMap[bucket].TaggingConfigXML = configData
bucketMap[bucket].TaggingConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
// Call site replication hook.
//
// We encode the xml bytes as base64 to ensure there are no encoding
// errors.
cfgStr := base64.StdEncoding.EncodeToString(configData)
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeTags,
Bucket: bucket,
Tags: &cfgStr,
UpdatedAt: updatedAt,
}); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
case bucketQuotaConfigFile:
data, err := io.ReadAll(reader)
if err != nil {
@ -1016,37 +961,49 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue
}
quotaConfig, err := parseBucketQuota(bucket, data)
_, err = parseBucketQuota(bucket, data)
if err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
updatedAt, err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, data)
if err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
bucketMap[bucket].QuotaConfigJSON = data
bucketMap[bucket].QuotaConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
bucketMeta := madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
Quota: data,
UpdatedAt: updatedAt,
}
if quotaConfig.Quota == 0 {
bucketMeta.Quota = nil
}
// Call site replication hook.
if err = globalSiteReplicationSys.BucketMetaHook(ctx, bucketMeta); err != nil {
rpt.SetStatus(bucket, fileName, err)
continue
}
}
}
enc := func(b []byte) *string {
if b == nil {
return nil
}
v := base64.StdEncoding.EncodeToString(b)
return &v
}
for bucket, meta := range bucketMap {
err := globalBucketMetadataSys.save(ctx, *meta)
if err != nil {
rpt.SetStatus(bucket, "", err)
continue
}
// Call site replication hook.
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
Bucket: bucket,
Quota: meta.QuotaConfigJSON,
Policy: meta.PolicyConfigJSON,
Versioning: enc(meta.VersioningConfigXML),
Tags: enc(meta.TaggingConfigXML),
ObjectLockConfig: enc(meta.ObjectLockConfigXML),
SSEConfig: enc(meta.EncryptionConfigXML),
UpdatedAt: updatedAt,
}); err != nil {
rpt.SetStatus(bucket, "", err)
continue
}
}
rptData, err := json.Marshal(rpt.BucketMetaImportErrs)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)

View File

@ -207,10 +207,15 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht
return
}
if item.Bucket == "" {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errSRInvalidRequest(errInvalidArgument)), r.URL)
return
}
var err error
switch item.Type {
default:
err = errSRInvalidRequest(errInvalidArgument)
err = globalSiteReplicationSys.PeerBucketMetadataUpdateHandler(ctx, item)
case madmin.SRBucketMetaTypePolicy:
if item.Policy == nil {
err = globalSiteReplicationSys.PeerBucketPolicyHandler(ctx, item.Bucket, nil, item.UpdatedAt)
@ -236,7 +241,7 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht
return
}
if err = globalSiteReplicationSys.PeerBucketQuotaConfigHandler(ctx, item.Bucket, quotaConfig, item.UpdatedAt); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}

View File

@ -151,14 +151,27 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string,
return updatedAt, fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile)
}
if err := meta.Save(ctx, objAPI); err != nil {
return updatedAt, err
err = sys.save(ctx, meta)
return updatedAt, err
}
func (sys *BucketMetadataSys) save(ctx context.Context, meta BucketMetadata) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
sys.Set(bucket, meta)
globalNotificationSys.LoadBucketMetadata(bgContext(ctx), bucket) // Do not use caller context here
if isMinioMetaBucketName(meta.Name) {
return errInvalidArgument
}
return updatedAt, nil
if err := meta.Save(ctx, objAPI); err != nil {
return err
}
sys.Set(meta.Name, meta)
globalNotificationSys.LoadBucketMetadata(bgContext(ctx), meta.Name) // Do not use caller context here
return nil
}
// Delete delete the bucket metadata for the specified bucket.

View File

@ -143,39 +143,38 @@ func (b *BucketMetadata) SetCreatedAt(createdAt time.Time) {
// Load - loads the metadata of bucket by name from ObjectLayer api.
// If an error is returned the returned metadata will be default initialized.
func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) error {
func readBucketMetadata(ctx context.Context, api ObjectLayer, name string) (BucketMetadata, error) {
if name == "" {
logger.LogIf(ctx, errors.New("bucket name cannot be empty"))
return errInvalidArgument
return BucketMetadata{}, errInvalidArgument
}
b := newBucketMetadata(name)
configFile := path.Join(bucketMetaPrefix, name, bucketMetadataFile)
data, err := readConfig(ctx, api, configFile)
if err != nil {
return err
return b, err
}
if len(data) <= 4 {
return fmt.Errorf("loadBucketMetadata: no data")
return b, fmt.Errorf("loadBucketMetadata: no data")
}
// Read header
switch binary.LittleEndian.Uint16(data[0:2]) {
case bucketMetadataFormat:
default:
return fmt.Errorf("loadBucketMetadata: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
return b, fmt.Errorf("loadBucketMetadata: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case bucketMetadataVersion:
default:
return fmt.Errorf("loadBucketMetadata: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
return b, fmt.Errorf("loadBucketMetadata: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
_, err = b.UnmarshalMsg(data[4:])
b.Name = name // in-case parsing failed for some reason, make sure bucket name is not empty.
return err
return b, err
}
func loadBucketMetadataParse(ctx context.Context, objectAPI ObjectLayer, bucket string, parse bool) (BucketMetadata, error) {
b := newBucketMetadata(bucket)
err := b.Load(ctx, objectAPI, b.Name)
b, err := readBucketMetadata(ctx, objectAPI, bucket)
b.Name = bucket // in-case parsing failed for some reason, make sure bucket name is not empty.
if err != nil && !errors.Is(err, errConfigNotFound) {
return b, err
}

View File

@ -1456,6 +1456,75 @@ func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bu
return nil
}
// PeerBucketMetadataUpdateHandler - merges the bucket metadata, save and ping other nodes
func (c *SiteReplicationSys) PeerBucketMetadataUpdateHandler(ctx context.Context, item madmin.SRBucketMeta) error {
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return errSRObjectLayerNotReady
}
if item.Bucket == "" || item.UpdatedAt.IsZero() {
return wrapSRErr(errInvalidArgument)
}
meta, err := readBucketMetadata(ctx, objectAPI, item.Bucket)
if err != nil {
return wrapSRErr(err)
}
if meta.Created.After(item.UpdatedAt) {
return nil
}
if item.Policy != nil {
meta.PolicyConfigJSON = item.Policy
meta.PolicyConfigUpdatedAt = item.UpdatedAt
}
if item.Versioning != nil {
configData, err := base64.StdEncoding.DecodeString(*item.Versioning)
if err != nil {
return wrapSRErr(err)
}
meta.VersioningConfigXML = configData
meta.VersioningConfigUpdatedAt = item.UpdatedAt
}
if item.Tags != nil {
configData, err := base64.StdEncoding.DecodeString(*item.Tags)
if err != nil {
return wrapSRErr(err)
}
meta.TaggingConfigXML = configData
meta.TaggingConfigUpdatedAt = item.UpdatedAt
}
if item.ObjectLockConfig != nil {
configData, err := base64.StdEncoding.DecodeString(*item.ObjectLockConfig)
if err != nil {
return wrapSRErr(err)
}
meta.ObjectLockConfigXML = configData
meta.ObjectLockConfigUpdatedAt = item.UpdatedAt
}
if item.SSEConfig != nil {
configData, err := base64.StdEncoding.DecodeString(*item.SSEConfig)
if err != nil {
return wrapSRErr(err)
}
meta.EncryptionConfigXML = configData
meta.EncryptionConfigUpdatedAt = item.UpdatedAt
}
if item.Quota != nil {
meta.QuotaConfigJSON = item.Quota
meta.QuotaConfigUpdatedAt = item.UpdatedAt
}
return globalBucketMetadataSys.save(ctx, meta)
}
// PeerBucketPolicyHandler - copies/deletes policy to local cluster.
func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *bktpolicy.Policy, updatedAt time.Time) error {
// skip overwrite if local update is newer than peer update.