mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
allow versioning config changes under site replication (#14876)
PR #14828 introduced prefix-level exclusion of versioning and replication - however our site replication implementation since it defaults versioning on all buckets did not allow changing versioning configuration once the bucket was created. This PR changes this and ensures that such changes are honored and also propagated/healed across sites appropriately.
This commit is contained in:
@@ -241,6 +241,8 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht
|
||||
return
|
||||
}
|
||||
}
|
||||
case madmin.SRBucketMetaTypeVersionConfig:
|
||||
err = globalSiteReplicationSys.PeerBucketVersioningHandler(ctx, item.Bucket, item.Versioning)
|
||||
case madmin.SRBucketMetaTypeTags:
|
||||
err = globalSiteReplicationSys.PeerBucketTaggingHandler(ctx, item.Bucket, item.Tags)
|
||||
case madmin.SRBucketMetaTypeObjectLockConfig:
|
||||
|
||||
@@ -18,12 +18,14 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/bucket/versioning"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/bucket/policy"
|
||||
@@ -63,10 +65,10 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
|
||||
if globalSiteReplicationSys.isEnabled() {
|
||||
if globalSiteReplicationSys.isEnabled() && !v.Enabled() {
|
||||
writeErrorResponse(ctx, w, APIError{
|
||||
Code: "InvalidBucketState",
|
||||
Description: "Cluster replication is enabled for this site, so the versioning state cannot be changed.",
|
||||
Description: "Cluster replication is enabled for this site, so the versioning cannot be suspended.",
|
||||
HTTPStatusCode: http.StatusConflict,
|
||||
}, r.URL)
|
||||
return
|
||||
@@ -100,6 +102,20 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
|
||||
// Call site replication hook.
|
||||
//
|
||||
// We encode the xml bytes as base64 to ensure there are no encoding
|
||||
// errors.
|
||||
cfgStr := base64.StdEncoding.EncodeToString(configData)
|
||||
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypeVersionConfig,
|
||||
Bucket: bucket,
|
||||
Versioning: &cfgStr,
|
||||
}); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
|
||||
|
||||
@@ -1216,6 +1216,23 @@ func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRB
|
||||
return cErr.summaryErr
|
||||
}
|
||||
|
||||
// PeerBucketVersioningHandler - updates versioning config to local cluster.
|
||||
func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bucket string, versioning *string) error {
|
||||
if versioning != nil {
|
||||
configData, err := base64.StdEncoding.DecodeString(*versioning)
|
||||
if err != nil {
|
||||
return wrapSRErr(err)
|
||||
}
|
||||
err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData)
|
||||
if err != nil {
|
||||
return wrapSRErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PeerBucketPolicyHandler - copies/deletes policy to local cluster.
|
||||
func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *policy.Policy) error {
|
||||
if policy != nil {
|
||||
@@ -2151,6 +2168,7 @@ func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI O
|
||||
for b, stat := range sinfo.BucketStats {
|
||||
for dID, st := range stat {
|
||||
if st.TagMismatch ||
|
||||
st.VersioningConfigMismatch ||
|
||||
st.OLockConfigMismatch ||
|
||||
st.SSEConfigMismatch ||
|
||||
st.PolicyMismatch ||
|
||||
@@ -2548,7 +2566,8 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
replCfgs := make([]*sreplication.Config, numSites)
|
||||
quotaCfgs := make([]*madmin.BucketQuota, numSites)
|
||||
sseCfgSet := set.NewStringSet()
|
||||
var tagCount, olockCfgCount, sseCfgCount int
|
||||
versionCfgSet := set.NewStringSet()
|
||||
var tagCount, olockCfgCount, sseCfgCount, versionCfgCount int
|
||||
for i, s := range slc {
|
||||
if s.ReplicationConfig != nil {
|
||||
cfgBytes, err := base64.StdEncoding.DecodeString(*s.ReplicationConfig)
|
||||
@@ -2561,6 +2580,16 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
}
|
||||
replCfgs[i] = cfg
|
||||
}
|
||||
if s.Versioning != nil {
|
||||
configData, err := base64.StdEncoding.DecodeString(*s.Versioning)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
versionCfgCount++
|
||||
if !versionCfgSet.Contains(string(configData)) {
|
||||
versionCfgSet.Add(string(configData))
|
||||
}
|
||||
}
|
||||
if s.QuotaConfig != nil {
|
||||
cfgBytes, err := base64.StdEncoding.DecodeString(*s.QuotaConfig)
|
||||
if err != nil {
|
||||
@@ -2590,12 +2619,11 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
policies[i] = plcy
|
||||
}
|
||||
if s.ObjectLockConfig != nil {
|
||||
olockCfgCount++
|
||||
configData, err := base64.StdEncoding.DecodeString(*s.ObjectLockConfig)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
olockCfgCount++
|
||||
if !olockConfigSet.Contains(string(configData)) {
|
||||
olockConfigSet.Add(string(configData))
|
||||
}
|
||||
@@ -2605,10 +2633,10 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
sseCfgCount++
|
||||
if !sseCfgSet.Contains(string(configData)) {
|
||||
sseCfgSet.Add(string(configData))
|
||||
}
|
||||
sseCfgCount++
|
||||
}
|
||||
ss, ok := info.StatsSummary[s.DeploymentID]
|
||||
if !ok {
|
||||
@@ -2628,6 +2656,9 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
if sseCfgCount > 0 {
|
||||
ss.TotalSSEConfigCount++
|
||||
}
|
||||
if versionCfgCount > 0 {
|
||||
ss.TotalVersioningConfigCount++
|
||||
}
|
||||
if len(policies) > 0 {
|
||||
ss.TotalBucketPoliciesCount++
|
||||
}
|
||||
@@ -2636,6 +2667,7 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
tagMismatch := !isReplicated(tagCount, numSites, tagSet)
|
||||
olockCfgMismatch := !isReplicated(olockCfgCount, numSites, olockConfigSet)
|
||||
sseCfgMismatch := !isReplicated(sseCfgCount, numSites, sseCfgSet)
|
||||
versionCfgMismatch := !isReplicated(versionCfgCount, numSites, versionCfgSet)
|
||||
policyMismatch := !isBktPolicyReplicated(numSites, policies)
|
||||
replCfgMismatch := !isBktReplCfgReplicated(numSites, replCfgs)
|
||||
quotaCfgMismatch := !isBktQuotaCfgReplicated(numSites, quotaCfgs)
|
||||
@@ -2648,20 +2680,21 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
}
|
||||
quotaCfgSet := hasBucket && *quotaCfgs[i] != madmin.BucketQuota{}
|
||||
ss := madmin.SRBucketStatsSummary{
|
||||
DeploymentID: s.DeploymentID,
|
||||
HasBucket: hasBucket,
|
||||
TagMismatch: tagMismatch,
|
||||
OLockConfigMismatch: olockCfgMismatch,
|
||||
SSEConfigMismatch: sseCfgMismatch,
|
||||
PolicyMismatch: policyMismatch,
|
||||
ReplicationCfgMismatch: replCfgMismatch,
|
||||
QuotaCfgMismatch: quotaCfgMismatch,
|
||||
HasReplicationCfg: s.ReplicationConfig != nil,
|
||||
HasTagsSet: s.Tags != nil,
|
||||
HasOLockConfigSet: s.ObjectLockConfig != nil,
|
||||
HasPolicySet: s.Policy != nil,
|
||||
HasQuotaCfgSet: quotaCfgSet,
|
||||
HasSSECfgSet: s.SSEConfig != nil,
|
||||
DeploymentID: s.DeploymentID,
|
||||
HasBucket: hasBucket,
|
||||
TagMismatch: tagMismatch,
|
||||
OLockConfigMismatch: olockCfgMismatch,
|
||||
SSEConfigMismatch: sseCfgMismatch,
|
||||
VersioningConfigMismatch: versionCfgMismatch,
|
||||
PolicyMismatch: policyMismatch,
|
||||
ReplicationCfgMismatch: replCfgMismatch,
|
||||
QuotaCfgMismatch: quotaCfgMismatch,
|
||||
HasReplicationCfg: s.ReplicationConfig != nil,
|
||||
HasTagsSet: s.Tags != nil,
|
||||
HasOLockConfigSet: s.ObjectLockConfig != nil,
|
||||
HasPolicySet: s.Policy != nil,
|
||||
HasQuotaCfgSet: quotaCfgSet,
|
||||
HasSSECfgSet: s.SSEConfig != nil,
|
||||
}
|
||||
var m srBucketMetaInfo
|
||||
if len(bucketStats[s.Bucket]) > dIdx {
|
||||
@@ -2678,6 +2711,9 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
if !olockCfgMismatch && olockCfgCount == numSites {
|
||||
sum.ReplicatedLockConfig++
|
||||
}
|
||||
if !versionCfgMismatch && versionCfgCount == numSites {
|
||||
sum.ReplicatedVersioningConfig++
|
||||
}
|
||||
if !sseCfgMismatch && sseCfgCount == numSites {
|
||||
sum.ReplicatedSSEConfig++
|
||||
}
|
||||
@@ -3368,6 +3404,7 @@ func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer
|
||||
|
||||
for bucket := range info.BucketStats {
|
||||
c.healCreateMissingBucket(ctx, objAPI, bucket, info)
|
||||
c.healVersioningMetadata(ctx, objAPI, bucket, info)
|
||||
c.healOLockConfigMetadata(ctx, objAPI, bucket, info)
|
||||
c.healSSEMetadata(ctx, objAPI, bucket, info)
|
||||
c.healBucketReplicationConfig(ctx, objAPI, bucket, info)
|
||||
@@ -3588,6 +3625,79 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
if !c.enabled {
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
latestID, latestPeerName string
|
||||
lastUpdate time.Time
|
||||
latestVersioningConfig *string
|
||||
)
|
||||
|
||||
bs := info.BucketStats[bucket]
|
||||
for dID, ss := range bs {
|
||||
if lastUpdate.IsZero() {
|
||||
lastUpdate = ss.meta.VersioningConfigUpdatedAt
|
||||
latestID = dID
|
||||
latestVersioningConfig = ss.meta.Versioning
|
||||
}
|
||||
// avoid considering just created buckets as latest. Perhaps this site
|
||||
// just joined cluster replication and yet to be sync'd
|
||||
if ss.meta.CreatedAt.Equal(ss.meta.VersioningConfigUpdatedAt) {
|
||||
continue
|
||||
}
|
||||
if ss.meta.VersioningConfigUpdatedAt.After(lastUpdate) {
|
||||
lastUpdate = ss.meta.VersioningConfigUpdatedAt
|
||||
latestID = dID
|
||||
latestVersioningConfig = ss.meta.Versioning
|
||||
}
|
||||
}
|
||||
|
||||
latestPeerName = info.Sites[latestID].Name
|
||||
var latestVersioningConfigBytes []byte
|
||||
var err error
|
||||
if latestVersioningConfig != nil {
|
||||
latestVersioningConfigBytes, err = base64.StdEncoding.DecodeString(*latestVersioningConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for dID, bStatus := range bs {
|
||||
if !bStatus.VersioningConfigMismatch {
|
||||
continue
|
||||
}
|
||||
if isBucketMetadataEqual(latestVersioningConfig, bStatus.meta.Versioning) {
|
||||
continue
|
||||
}
|
||||
if dID == globalDeploymentID {
|
||||
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestVersioningConfigBytes); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Error healing sse metadata from peer site %s : %w", latestPeerName, err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
admClient, err := c.getAdminClient(ctx, dID)
|
||||
if err != nil {
|
||||
return wrapSRErr(err)
|
||||
}
|
||||
peerName := info.Sites[dID].Name
|
||||
err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypeVersionConfig,
|
||||
Bucket: bucket,
|
||||
Versioning: latestVersioningConfig,
|
||||
})
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %s",
|
||||
peerName, latestPeerName, err.Error())))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
Reference in New Issue
Block a user