make site replication healing safer (#16560)

This commit is contained in:
Harshavardhana 2023-02-07 21:44:42 -08:00 committed by GitHub
parent 84fe4fd156
commit d8daabae9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 177 deletions

View File

@ -337,6 +337,20 @@ func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.Buc
return meta.bucketTargetConfig, nil
}
// GetConfigFromDisk read bucket metadata config from disk.
func (sys *BucketMetadataSys) GetConfigFromDisk(ctx context.Context, bucket string) (BucketMetadata, error) {
objAPI := newObjectLayerFn()
if objAPI == nil {
return newBucketMetadata(bucket), errServerNotInitialized
}
if isMinioMetaBucketName(bucket) {
return newBucketMetadata(bucket), errInvalidArgument
}
return loadBucketMetadata(ctx, objAPI, bucket)
}
// GetConfig returns a specific configuration from the bucket metadata.
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (BucketMetadata, error) {

View File

@ -1556,7 +1556,12 @@ func (c *SiteReplicationSys) listBuckets(ctx context.Context) ([]BucketInfo, err
// syncToAllPeers is used for syncing local data to all remote peers, it is
// called once during initial "AddPeerClusters" request.
func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
buckets, err := c.listBuckets(ctx)
objAPI := newObjectLayerFn()
if objAPI == nil {
return errSRObjectLayerNotReady
}
buckets, err := objAPI.ListBuckets(ctx, BucketOptions{})
if err != nil {
return err
}
@ -1564,41 +1569,27 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
for _, bucketInfo := range buckets {
bucket := bucketInfo.Name
// MinIO does not store bucket location - so we just check if
// object locking is enabled.
lockConfig, _, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
if err != nil {
if _, ok := err.(BucketObjectLockConfigNotFound); !ok {
return errSRBackendIssue(err)
}
meta, err := globalBucketMetadataSys.GetConfigFromDisk(ctx, bucket)
if err != nil && !errors.Is(err, errConfigNotFound) {
return errSRBackendIssue(err)
}
var opts MakeBucketOptions
if lockConfig != nil {
opts.LockEnabled = lockConfig.ObjectLockEnabled == "Enabled"
if meta.objectLockConfig != nil {
opts.LockEnabled = meta.objectLockConfig.ObjectLockEnabled == "Enabled"
}
opts.CreatedAt, _ = globalBucketMetadataSys.CreatedAt(bucket)
opts.CreatedAt = bucketInfo.Created.UTC()
// Now call the MakeBucketHook on existing bucket - this will
// create buckets and replication rules on peer clusters.
err = c.MakeBucketHook(ctx, bucket, opts)
if err != nil {
if err = c.MakeBucketHook(ctx, bucket, opts); err != nil {
return errSRBucketConfigError(err)
}
// Replicate bucket policy if present.
policy, tm, err := globalBucketMetadataSys.GetPolicyConfig(bucket)
found := true
if _, ok := err.(BucketPolicyNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
policyJSON, err := json.Marshal(policy)
if err != nil {
return wrapSRErr(err)
}
policyJSON, tm := meta.PolicyConfigJSON, meta.PolicyConfigUpdatedAt
if len(policyJSON) > 0 {
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
@ -1611,18 +1602,8 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
// Replicate bucket tags if present.
tags, tm, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
found = true
if _, ok := err.(BucketTaggingNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
tagCfg, err := xml.Marshal(tags)
if err != nil {
return wrapSRErr(err)
}
tagCfg, tm := meta.TaggingConfigXML, meta.TaggingConfigUpdatedAt
if len(tagCfg) > 0 {
tagCfgStr := base64.StdEncoding.EncodeToString(tagCfg)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeTags,
@ -1636,18 +1617,8 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
// Replicate object-lock config if present.
objLockCfg, tm, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
found = true
if _, ok := err.(BucketObjectLockConfigNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
objLockCfgData, err := xml.Marshal(objLockCfg)
if err != nil {
return wrapSRErr(err)
}
objLockCfgData, tm := meta.ObjectLockConfigXML, meta.ObjectLockConfigUpdatedAt
if len(objLockCfgData) > 0 {
objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeObjectLockConfig,
@ -1661,18 +1632,8 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
// Replicate existing bucket bucket encryption settings
sseConfig, tm, err := globalBucketMetadataSys.GetSSEConfig(bucket)
found = true
if _, ok := err.(BucketSSEConfigNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
sseConfigData, err := xml.Marshal(sseConfig)
if err != nil {
return wrapSRErr(err)
}
sseConfigData, tm := meta.EncryptionConfigXML, meta.EncryptionConfigUpdatedAt
if len(sseConfigData) > 0 {
sseConfigStr := base64.StdEncoding.EncodeToString(sseConfigData)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeSSEConfig,
@ -1685,18 +1646,8 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
}
}
quotaConfig, tm, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket)
found = true
if _, ok := err.(BucketQuotaConfigNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
quotaConfigJSON, err := json.Marshal(quotaConfig)
if err != nil {
return wrapSRErr(err)
}
quotaConfigJSON, tm := meta.QuotaConfigJSON, meta.QuotaConfigUpdatedAt
if len(quotaConfigJSON) > 0 {
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
@ -2253,6 +2204,10 @@ func (c *SiteReplicationSys) RemoveRemoteTargetsForEndpoint(ctx context.Context,
}
}
buckets, err := objectAPI.ListBuckets(ctx, BucketOptions{})
if err != nil {
return errSRBackendIssue(err)
}
for _, b := range buckets {
config, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, b.Name)
if err != nil {
@ -3213,129 +3168,51 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI
info.Buckets[bucket] = bms
continue
}
// Get bucket policy if present.
policy, updatedAt, err := globalBucketMetadataSys.GetPolicyConfig(bucket)
found := true
if _, ok := err.(BucketPolicyNotFound); ok {
found = false
} else if err != nil {
meta, err := globalBucketMetadataSys.GetConfigFromDisk(ctx, bucket)
if err != nil && !errors.Is(err, errConfigNotFound) {
return info, errSRBackendIssue(err)
}
if found {
policyJSON, err := json.Marshal(policy)
if err != nil {
return info, wrapSRErr(err)
}
bms.Policy = policyJSON
bms.PolicyUpdatedAt = updatedAt
}
// Get bucket tags if present.
tags, updatedAt, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
found = true
if _, ok := err.(BucketTaggingNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
tagBytes, err := xml.Marshal(tags)
if err != nil {
return info, wrapSRErr(err)
}
tagCfgStr := base64.StdEncoding.EncodeToString(tagBytes)
bms.Policy = meta.PolicyConfigJSON
bms.PolicyUpdatedAt = meta.PolicyConfigUpdatedAt
if len(meta.TaggingConfigXML) > 0 {
tagCfgStr := base64.StdEncoding.EncodeToString(meta.TaggingConfigXML)
bms.Tags = &tagCfgStr
bms.TagConfigUpdatedAt = updatedAt
bms.TagConfigUpdatedAt = meta.TaggingConfigUpdatedAt
}
versioningCfg, updatedAt, err := globalBucketMetadataSys.GetVersioningConfig(bucket)
found = true
if versioningCfg != nil && versioningCfg.Status == "" {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
versionCfgData, err := xml.Marshal(versioningCfg)
if err != nil {
return info, wrapSRErr(err)
}
versioningCfgStr := base64.StdEncoding.EncodeToString(versionCfgData)
if len(meta.VersioningConfigXML) > 0 {
versioningCfgStr := base64.StdEncoding.EncodeToString(meta.VersioningConfigXML)
bms.Versioning = &versioningCfgStr
bms.VersioningConfigUpdatedAt = updatedAt
bms.VersioningConfigUpdatedAt = meta.VersioningConfigUpdatedAt
}
// Get object-lock config if present.
objLockCfg, updatedAt, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
found = true
if _, ok := err.(BucketObjectLockConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
objLockCfgData, err := xml.Marshal(objLockCfg)
if err != nil {
return info, wrapSRErr(err)
}
objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData)
if len(meta.ObjectLockConfigXML) > 0 {
objLockStr := base64.StdEncoding.EncodeToString(meta.ObjectLockConfigXML)
bms.ObjectLockConfig = &objLockStr
bms.ObjectLockConfigUpdatedAt = updatedAt
bms.ObjectLockConfigUpdatedAt = meta.ObjectLockConfigUpdatedAt
}
// Get quota config if present
quotaConfig, updatedAt, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket)
found = true
if _, ok := err.(BucketQuotaConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
quotaConfigJSON, err := json.Marshal(quotaConfig)
if err != nil {
return info, wrapSRErr(err)
}
quotaConfigStr := base64.StdEncoding.EncodeToString(quotaConfigJSON)
if len(meta.QuotaConfigJSON) > 0 {
quotaConfigStr := base64.StdEncoding.EncodeToString(meta.QuotaConfigJSON)
bms.QuotaConfig = &quotaConfigStr
bms.QuotaConfigUpdatedAt = updatedAt
bms.QuotaConfigUpdatedAt = meta.QuotaConfigUpdatedAt
}
// Get existing bucket bucket encryption settings
sseConfig, updatedAt, err := globalBucketMetadataSys.GetSSEConfig(bucket)
found = true
if _, ok := err.(BucketSSEConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
sseConfigData, err := xml.Marshal(sseConfig)
if err != nil {
return info, wrapSRErr(err)
}
sseConfigStr := base64.StdEncoding.EncodeToString(sseConfigData)
if len(meta.EncryptionConfigXML) > 0 {
sseConfigStr := base64.StdEncoding.EncodeToString(meta.EncryptionConfigXML)
bms.SSEConfig = &sseConfigStr
bms.SSEConfigUpdatedAt = updatedAt
bms.SSEConfigUpdatedAt = meta.EncryptionConfigUpdatedAt
}
// Get replication config if present
rcfg, updatedAt, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
found = true
if _, ok := err.(BucketReplicationConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
rcfgXML, err := xml.Marshal(rcfg)
if err != nil {
return info, wrapSRErr(err)
}
rcfgXMLStr := base64.StdEncoding.EncodeToString(rcfgXML)
if len(meta.ReplicationConfigXML) > 0 {
rcfgXMLStr := base64.StdEncoding.EncodeToString(meta.ReplicationConfigXML)
bms.ReplicationConfig = &rcfgXMLStr
bms.ReplicationConfigUpdatedAt = updatedAt
bms.ReplicationConfigUpdatedAt = meta.ReplicationConfigUpdatedAt
}
info.Buckets[bucket] = bms
}
}
@ -3617,7 +3494,7 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf
return nil
}
const siteHealTimeInterval = 10 * time.Second
const siteHealTimeInterval = 1 * time.Minute
func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) {
ctx, cancel := globalLeaderLock.GetLock(ctx)