Replicate Expiry ILM configs while site replication (#18130)

Signed-off-by: Shubhendu Ram Tripathi <shubhendu@minio.io>
This commit is contained in:
Shubhendu
2023-11-21 23:18:06 +05:30
committed by GitHub
parent 41091d9472
commit 58306a9d34
10 changed files with 1123 additions and 63 deletions

View File

@@ -52,7 +52,8 @@ func (a adminAPIHandlers) SiteReplicationAdd(w http.ResponseWriter, r *http.Requ
return
}
status, err := globalSiteReplicationSys.AddPeerClusters(ctx, sites)
opts := getSRAddOptions(r)
status, err := globalSiteReplicationSys.AddPeerClusters(ctx, sites, opts)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
@@ -68,6 +69,12 @@ func (a adminAPIHandlers) SiteReplicationAdd(w http.ResponseWriter, r *http.Requ
writeSuccessResponseJSON(w, body)
}
func getSRAddOptions(r *http.Request) (opts madmin.SRAddOptions) {
q := r.Form
opts.ReplicateILMExpiry = q.Get("replicateILMExpiry") == "true"
return
}
// SRPeerJoin - PUT /minio/admin/v3/site-replication/join
//
// used internally to tell current cluster to enable SR with
@@ -192,7 +199,7 @@ func (a adminAPIHandlers) SRPeerReplicateIAMItem(w http.ResponseWriter, r *http.
}
}
// SRPeerReplicateBucketItem - PUT /minio/admin/v3/site-replication/bucket-meta
// SRPeerReplicateBucketItem - PUT /minio/admin/v3/site-replication/peer/bucket-meta
func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@@ -253,6 +260,8 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht
err = globalSiteReplicationSys.PeerBucketObjectLockConfigHandler(ctx, item.Bucket, item.ObjectLockConfig, item.UpdatedAt)
case madmin.SRBucketMetaTypeSSEConfig:
err = globalSiteReplicationSys.PeerBucketSSEConfigHandler(ctx, item.Bucket, item.SSEConfig, item.UpdatedAt)
case madmin.SRBucketMetaLCConfig:
err = globalSiteReplicationSys.PeerBucketLCConfigHandler(ctx, item.Bucket, item.ExpiryLCConfig, item.UpdatedAt)
}
if err != nil {
logger.LogIf(ctx, err)
@@ -334,6 +343,7 @@ func (a adminAPIHandlers) SiteReplicationStatus(w http.ResponseWriter, r *http.R
opts.Users = true
opts.Policies = true
opts.Groups = true
opts.ILMExpiryRules = true
}
info, err := globalSiteReplicationSys.SiteReplicationStatus(ctx, objectAPI, opts)
if err != nil {
@@ -383,7 +393,9 @@ func (a adminAPIHandlers) SiteReplicationEdit(w http.ResponseWriter, r *http.Req
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
status, err := globalSiteReplicationSys.EditPeerCluster(ctx, site)
opts := getSREditOptions(r)
status, err := globalSiteReplicationSys.EditPeerCluster(ctx, site, opts)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
@@ -398,6 +410,13 @@ func (a adminAPIHandlers) SiteReplicationEdit(w http.ResponseWriter, r *http.Req
writeSuccessResponseJSON(w, body)
}
func getSREditOptions(r *http.Request) (opts madmin.SREditOptions) {
q := r.Form
opts.DisableILMExpiryReplication = q.Get("disableILMExpiryReplication") == "true"
opts.EnableILMExpiryReplication = q.Get("enableILMExpiryReplication") == "true"
return
}
// SRPeerEdit - PUT /minio/admin/v3/site-replication/peer/edit
//
// used internally to tell current cluster to update endpoint for peer
@@ -422,12 +441,37 @@ func (a adminAPIHandlers) SRPeerEdit(w http.ResponseWriter, r *http.Request) {
}
}
// SRStateEdit - PUT /minio/admin/v3/site-replication/state/edit
//
// used internally to tell current cluster to update site replication state
func (a adminAPIHandlers) SRStateEdit(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
objectAPI, _ := validateAdminReq(ctx, w, r, policy.SiteReplicationOperationAction)
if objectAPI == nil {
return
}
var state madmin.SRStateEditReq
if err := parseJSONBody(ctx, r.Body, &state, ""); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if err := globalSiteReplicationSys.PeerStateEditReq(ctx, state); err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}
func getSRStatusOptions(r *http.Request) (opts madmin.SRStatusOptions) {
q := r.Form
opts.Buckets = q.Get("buckets") == "true"
opts.Policies = q.Get("policies") == "true"
opts.Groups = q.Get("groups") == "true"
opts.Users = q.Get("users") == "true"
opts.ILMExpiryRules = q.Get("ilm-expiry-rules") == "true"
opts.PeerState = q.Get("peer-state") == "true"
opts.Entity = madmin.GetSREntityType(q.Get("entity"))
opts.EntityValue = q.Get("entityvalue")
opts.ShowDeleted = q.Get("showDeleted") == "true"

View File

@@ -376,6 +376,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/edit").HandlerFunc(adminMiddleware(adminAPI.SRPeerEdit))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/remove").HandlerFunc(adminMiddleware(adminAPI.SRPeerRemove))
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/resync/op").HandlerFunc(adminMiddleware(adminAPI.SiteReplicationResyncOp)).Queries("operation", "{operation:.*}")
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/state/edit").HandlerFunc(adminMiddleware(adminAPI.SRStateEdit))
if globalIsDistErasure {
// Top locks

View File

@@ -22,6 +22,7 @@ import (
"io"
"net/http"
"strconv"
"time"
"github.com/minio/minio/internal/bucket/lifecycle"
xhttp "github.com/minio/minio/internal/http"
@@ -86,6 +87,41 @@ func (api objectAPIHandlers) PutBucketLifecycleHandler(w http.ResponseWriter, r
return
}
// Create a map of updated set of rules in request
updatedRules := make(map[string]lifecycle.Rule, len(bucketLifecycle.Rules))
for _, rule := range bucketLifecycle.Rules {
updatedRules[rule.ID] = rule
}
// Get list of rules for the bucket from disk
meta, err := globalBucketMetadataSys.GetConfigFromDisk(ctx, bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
expiryRuleRemoved := false
if len(meta.LifecycleConfigXML) > 0 {
var lcCfg lifecycle.Lifecycle
if err := xml.Unmarshal(meta.LifecycleConfigXML, &lcCfg); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
for _, rl := range lcCfg.Rules {
updRule, ok := updatedRules[rl.ID]
// original rule had expiry that is no longer in the new config,
// or rule is present but missing expiration flags
if (!rl.Expiration.IsNull() || !rl.NoncurrentVersionExpiration.IsNull()) &&
(!ok || (updRule.Expiration.IsNull() && updRule.NoncurrentVersionExpiration.IsNull())) {
expiryRuleRemoved = true
}
}
}
if bucketLifecycle.HasExpiry() || expiryRuleRemoved {
currtime := time.Now()
bucketLifecycle.ExpiryUpdatedAt = &currtime
}
configData, err := xml.Marshal(bucketLifecycle)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
@@ -142,6 +178,8 @@ func (api objectAPIHandlers) GetBucketLifecycleHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// explicitly set ExpiryUpdatedAt nil as its meant for internal consumption only
config.ExpiryUpdatedAt = nil
configData, err := xml.Marshal(config)
if err != nil {

View File

@@ -19,6 +19,7 @@ package cmd
import (
"context"
"encoding/xml"
"errors"
"fmt"
"sync"
@@ -177,6 +178,40 @@ func (sys *BucketMetadataSys) save(ctx context.Context, meta BucketMetadata) err
// Delete delete the bucket metadata for the specified bucket.
// must be used by all callers instead of using Update() with nil configData.
func (sys *BucketMetadataSys) Delete(ctx context.Context, bucket string, configFile string) (updatedAt time.Time, err error) {
if configFile == bucketLifecycleConfig {
// Get bucket config from current site
meta, e := globalBucketMetadataSys.GetConfigFromDisk(ctx, bucket)
if e != nil && !errors.Is(e, errConfigNotFound) {
return updatedAt, e
}
var expiryRuleRemoved bool
if len(meta.LifecycleConfigXML) > 0 {
var lcCfg lifecycle.Lifecycle
if err := xml.Unmarshal(meta.LifecycleConfigXML, &lcCfg); err != nil {
return updatedAt, err
}
// find a single expiry rule set the flag
for _, rl := range lcCfg.Rules {
if !rl.Expiration.IsNull() || !rl.NoncurrentVersionExpiration.IsNull() {
expiryRuleRemoved = true
break
}
}
}
// Form empty ILM details with `ExpiryUpdatedAt` field and save
var cfgData []byte
if expiryRuleRemoved {
var lcCfg lifecycle.Lifecycle
currtime := time.Now()
lcCfg.ExpiryUpdatedAt = &currtime
cfgData, err = xml.Marshal(lcCfg)
if err != nil {
return updatedAt, err
}
}
return sys.updateAndParse(ctx, bucket, configFile, cfgData, false)
}
return sys.updateAndParse(ctx, bucket, configFile, nil, false)
}
@@ -267,7 +302,10 @@ func (sys *BucketMetadataSys) GetLifecycleConfig(bucket string) (*lifecycle.Life
}
return nil, time.Time{}, err
}
if meta.lifecycleConfig == nil {
// there could be just `ExpiryUpdatedAt` field populated as part
// of last delete all. Treat this situation as not lifecycle configuration
// available
if meta.lifecycleConfig == nil || len(meta.lifecycleConfig.Rules) == 0 {
return nil, time.Time{}, BucketLifecycleNotFound{Bucket: bucket}
}
return meta.lifecycleConfig, meta.LifecycleConfigUpdatedAt, nil

File diff suppressed because it is too large Load Diff