site healing: Skip stale bucket metadata updates from peer (#15186)

Allow healing to apply bucket metadata change only when peer
gave the most recent update.
This commit is contained in:
Poorna
2022-06-28 18:09:20 -07:00
committed by GitHub
parent 2f25639ea0
commit 7cc9286e0f
16 changed files with 1523 additions and 135 deletions

View File

@@ -817,7 +817,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
if err != nil {
return err
}
if err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
return err
}
targetARN = bucketTarget.Arn
@@ -930,7 +930,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
return err
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, replCfgData)
_, err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, replCfgData)
return c.annotatePeerErr(peer.Name, "Error updating replication configuration", err)
}
@@ -1228,13 +1228,19 @@ func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRB
}
// PeerBucketVersioningHandler - updates versioning config to local cluster.
func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bucket string, versioning *string) error {
func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bucket string, versioning *string, updatedAt time.Time) error {
if versioning != nil {
// skip overwrite if local update is newer than peer update.
if !updatedAt.IsZero() {
if _, updateTm, err := globalBucketMetadataSys.GetVersioningConfig(bucket); err == nil && updateTm.After(updatedAt) {
return nil
}
}
configData, err := base64.StdEncoding.DecodeString(*versioning)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData)
_, err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData)
if err != nil {
return wrapSRErr(err)
}
@@ -1245,14 +1251,21 @@ func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bu
}
// PeerBucketPolicyHandler - copies/deletes policy to local cluster.
func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *policy.Policy) error {
func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *policy.Policy, updatedAt time.Time) error {
// skip overwrite if local update is newer than peer update.
if !updatedAt.IsZero() {
if _, updateTm, err := globalBucketMetadataSys.GetPolicyConfig(bucket); err == nil && updateTm.After(updatedAt) {
return nil
}
}
if policy != nil {
configData, err := json.Marshal(policy)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, configData)
_, err = globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, configData)
if err != nil {
return wrapSRErr(err)
}
@@ -1260,7 +1273,7 @@ func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket
}
// Delete the bucket policy
err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, nil)
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, nil)
if err != nil {
return wrapSRErr(err)
}
@@ -1269,13 +1282,20 @@ func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket
}
// PeerBucketTaggingHandler - copies/deletes tags to local cluster.
func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucket string, tags *string) error {
func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucket string, tags *string, updatedAt time.Time) error {
// skip overwrite if local update is newer than peer update.
if !updatedAt.IsZero() {
if _, updateTm, err := globalBucketMetadataSys.GetTaggingConfig(bucket); err == nil && updateTm.After(updatedAt) {
return nil
}
}
if tags != nil {
configData, err := base64.StdEncoding.DecodeString(*tags)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, configData)
_, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, configData)
if err != nil {
return wrapSRErr(err)
}
@@ -1283,7 +1303,7 @@ func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucke
}
// Delete the tags
err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, nil)
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, nil)
if err != nil {
return wrapSRErr(err)
}
@@ -1292,13 +1312,20 @@ func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucke
}
// PeerBucketObjectLockConfigHandler - sets object lock on local bucket.
func (c *SiteReplicationSys) PeerBucketObjectLockConfigHandler(ctx context.Context, bucket string, objectLockData *string) error {
func (c *SiteReplicationSys) PeerBucketObjectLockConfigHandler(ctx context.Context, bucket string, objectLockData *string, updatedAt time.Time) error {
if objectLockData != nil {
// skip overwrite if local update is newer than peer update.
if !updatedAt.IsZero() {
if _, updateTm, err := globalBucketMetadataSys.GetObjectLockConfig(bucket); err == nil && updateTm.After(updatedAt) {
return nil
}
}
configData, err := base64.StdEncoding.DecodeString(*objectLockData)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, configData)
_, err = globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, configData)
if err != nil {
return wrapSRErr(err)
}
@@ -1309,13 +1336,20 @@ func (c *SiteReplicationSys) PeerBucketObjectLockConfigHandler(ctx context.Conte
}
// PeerBucketSSEConfigHandler - copies/deletes SSE config to local cluster.
func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, bucket string, sseConfig *string) error {
func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, bucket string, sseConfig *string, updatedAt time.Time) error {
// skip overwrite if local update is newer than peer update.
if !updatedAt.IsZero() {
if _, updateTm, err := globalBucketMetadataSys.GetSSEConfig(bucket); err == nil && updateTm.After(updatedAt) {
return nil
}
}
if sseConfig != nil {
configData, err := base64.StdEncoding.DecodeString(*sseConfig)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, configData)
_, err = globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, configData)
if err != nil {
return wrapSRErr(err)
}
@@ -1323,7 +1357,7 @@ func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, buc
}
// Delete sse config
err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, nil)
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, nil)
if err != nil {
return wrapSRErr(err)
}
@@ -1331,14 +1365,21 @@ func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, buc
}
// PeerBucketQuotaConfigHandler - copies/deletes policy to local cluster.
func (c *SiteReplicationSys) PeerBucketQuotaConfigHandler(ctx context.Context, bucket string, quota *madmin.BucketQuota) error {
func (c *SiteReplicationSys) PeerBucketQuotaConfigHandler(ctx context.Context, bucket string, quota *madmin.BucketQuota, updatedAt time.Time) error {
// skip overwrite if local update is newer than peer update.
if !updatedAt.IsZero() {
if _, updateTm, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket); err == nil && updateTm.After(updatedAt) {
return nil
}
}
if quota != nil {
quotaData, err := json.Marshal(quota)
if err != nil {
return wrapSRErr(err)
}
if err = globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, quotaData); err != nil {
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, quotaData); err != nil {
return wrapSRErr(err)
}
@@ -1346,7 +1387,7 @@ func (c *SiteReplicationSys) PeerBucketQuotaConfigHandler(ctx context.Context, b
}
// Delete the bucket policy
err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, nil)
_, err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, nil)
if err != nil {
return wrapSRErr(err)
}
@@ -1437,7 +1478,7 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
// Replicate bucket policy if present.
policy, err := globalPolicySys.Get(bucket)
policy, tm, err := globalBucketMetadataSys.GetPolicyConfig(bucket)
found := true
if _, ok := err.(BucketPolicyNotFound); ok {
found = false
@@ -1450,9 +1491,10 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
return wrapSRErr(err)
}
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
Policy: policyJSON,
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
Policy: policyJSON,
UpdatedAt: tm,
})
if err != nil {
return errSRBucketMetaError(err)
@@ -1460,7 +1502,7 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
// Replicate bucket tags if present.
tags, _, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
tags, tm, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
found = true
if _, ok := err.(BucketTaggingNotFound); ok {
found = false
@@ -1474,9 +1516,10 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
tagCfgStr := base64.StdEncoding.EncodeToString(tagCfg)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeTags,
Bucket: bucket,
Tags: &tagCfgStr,
Type: madmin.SRBucketMetaTypeTags,
Bucket: bucket,
Tags: &tagCfgStr,
UpdatedAt: tm,
})
if err != nil {
return errSRBucketMetaError(err)
@@ -1484,7 +1527,7 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
// Replicate object-lock config if present.
objLockCfg, _, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
objLockCfg, tm, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
found = true
if _, ok := err.(BucketObjectLockConfigNotFound); ok {
found = false
@@ -1498,9 +1541,10 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeObjectLockConfig,
Bucket: bucket,
Tags: &objLockStr,
Type: madmin.SRBucketMetaTypeObjectLockConfig,
Bucket: bucket,
Tags: &objLockStr,
UpdatedAt: tm,
})
if err != nil {
return errSRBucketMetaError(err)
@@ -1508,7 +1552,7 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
// Replicate existing bucket bucket encryption settings
sseConfig, _, err := globalBucketMetadataSys.GetSSEConfig(bucket)
sseConfig, tm, err := globalBucketMetadataSys.GetSSEConfig(bucket)
found = true
if _, ok := err.(BucketSSEConfigNotFound); ok {
found = false
@@ -1525,13 +1569,14 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
Type: madmin.SRBucketMetaTypeSSEConfig,
Bucket: bucket,
SSEConfig: &sseConfigStr,
UpdatedAt: tm,
})
if err != nil {
return errSRBucketMetaError(err)
}
}
quotaConfig, _, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket)
quotaConfig, tm, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket)
found = true
if _, ok := err.(BucketQuotaConfigNotFound); ok {
found = false
@@ -1544,9 +1589,10 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
return wrapSRErr(err)
}
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
Quota: quotaConfigJSON,
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
Quota: quotaConfigJSON,
UpdatedAt: tm,
})
if err != nil {
return errSRBucketMetaError(err)
@@ -2069,11 +2115,11 @@ func (c *SiteReplicationSys) RemoveRemoteTargetsForEndpoint(ctx context.Context,
if err != nil {
return err
}
if err = globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, configData); err != nil {
if _, err = globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, configData); err != nil {
return err
}
} else {
if err := globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, nil); err != nil {
if _, err := globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, nil); err != nil {
return err
}
}
@@ -3440,14 +3486,20 @@ type srStatusInfo struct {
}
func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer) error {
info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{
Buckets: true,
})
buckets, err := c.listBuckets(ctx)
if err != nil {
return err
}
for bucket := range info.BucketStats {
for _, bi := range buckets {
bucket := bi.Name
info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{
Entity: madmin.SRBucketEntity,
EntityValue: bucket,
})
if err != nil {
logger.LogIf(ctx, err)
continue
}
c.healCreateMissingBucket(ctx, objAPI, bucket, info)
c.healVersioningMetadata(ctx, objAPI, bucket, info)
c.healOLockConfigMetadata(ctx, objAPI, bucket, info)
@@ -3509,7 +3561,7 @@ func (c *SiteReplicationSys) healTagMetadata(ctx context.Context, objAPI ObjectL
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, latestTaggingConfigBytes); err != nil {
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, latestTaggingConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing tagging metadata from peer site %s : %w", latestPeerName, err))
}
continue
@@ -3573,7 +3625,7 @@ func (c *SiteReplicationSys) healBucketPolicies(ctx context.Context, objAPI Obje
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, latestIAMPolicy); err != nil {
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, latestIAMPolicy); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing bucket policy metadata from peer site %s : %w", latestPeerName, err))
}
continue
@@ -3585,9 +3637,10 @@ func (c *SiteReplicationSys) healBucketPolicies(ctx context.Context, objAPI Obje
}
peerName := info.Sites[dID].Name
if err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
Policy: latestIAMPolicy,
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
Policy: latestIAMPolicy,
UpdatedAt: lastUpdate,
}); err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
fmt.Errorf("Error healing bucket policy metadata for peer %s from peer %s : %w",
@@ -3647,7 +3700,7 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil {
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing quota metadata from peer site %s : %w", latestPeerName, err))
}
continue
@@ -3660,9 +3713,10 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O
peerName := info.Sites[dID].Name
if err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
Quota: latestQuotaConfigBytes,
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
Quota: latestQuotaConfigBytes,
UpdatedAt: lastUpdate,
}); err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
fmt.Errorf("Error healing quota config metadata for peer %s from peer %s : %w",
@@ -3721,7 +3775,7 @@ func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, latestVersioningConfigBytes); err != nil {
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, latestVersioningConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing versioning metadata from peer site %s : %w", latestPeerName, err))
}
continue
@@ -3736,6 +3790,7 @@ func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI
Type: madmin.SRBucketMetaTypeVersionConfig,
Bucket: bucket,
Versioning: latestVersioningConfig,
UpdatedAt: lastUpdate,
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
@@ -3795,7 +3850,7 @@ func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectL
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestSSEConfigBytes); err != nil {
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestSSEConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing sse metadata from peer site %s : %w", latestPeerName, err))
}
continue
@@ -3810,6 +3865,7 @@ func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectL
Type: madmin.SRBucketMetaTypeSSEConfig,
Bucket: bucket,
SSEConfig: latestSSEConfig,
UpdatedAt: lastUpdate,
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
@@ -3869,7 +3925,7 @@ func (c *SiteReplicationSys) healOLockConfigMetadata(ctx context.Context, objAPI
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, latestObjLockConfigBytes); err != nil {
if _, err := globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, latestObjLockConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing objectlock config metadata from peer site %s : %w", latestPeerName, err))
}
continue
@@ -3881,9 +3937,10 @@ func (c *SiteReplicationSys) healOLockConfigMetadata(ctx context.Context, objAPI
}
peerName := info.Sites[dID].Name
err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeObjectLockConfig,
Bucket: bucket,
Tags: latestObjLockConfig,
Type: madmin.SRBucketMetaTypeObjectLockConfig,
Bucket: bucket,
Tags: latestObjLockConfig,
UpdatedAt: lastUpdate,
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,