Add support for adding new site(s) to site replication (#13696)

Currently, the new site is expected to be empty
This commit is contained in:
Poorna K
2021-11-30 13:16:37 -08:00
committed by GitHub
parent d21466f595
commit 9ec197f2e8
4 changed files with 261 additions and 103 deletions

View File

@@ -252,73 +252,113 @@ const (
siteReplicatorSvcAcc = "site-replicator-0"
)
// AddPeerClusters - add cluster sites for replication configuration.
func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, sites []madmin.PeerSite) (madmin.ReplicateAddStatus, SRError) {
// If current cluster is already SR enabled, we fail.
if c.enabled {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRCannotJoin)
}
// PeerSiteInfo is a wrapper struct around madmin.PeerSite with extra info on site status
type PeerSiteInfo struct {
madmin.PeerSite
self bool
DeploymentID string
Replicated bool // true if already participating in site replication
Empty bool // true if cluster has no buckets
}
// Only one of the clusters being added, can have any buckets (i.e. self
// here) - others must be empty.
selfIdx := -1
localHasBuckets := false
nonLocalPeerWithBuckets := ""
deploymentIDs := make([]string, 0, len(sites))
deploymentIDsSet := set.NewStringSet()
for i, v := range sites {
// getSiteStatuses gathers more info on the sites being added
func (c *SiteReplicationSys) getSiteStatuses(ctx context.Context, sites []madmin.PeerSite) (psi []PeerSiteInfo, err SRError) {
for _, v := range sites {
admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey)
if err != nil {
return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
return psi, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
}
info, err := admClient.ServerInfo(ctx)
if err != nil {
return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to fetch server info for %s: %w", v.Name, err))
return psi, errSRPeerResp(fmt.Errorf("unable to fetch server info for %s: %w", v.Name, err))
}
deploymentID := info.DeploymentID
if deploymentID == "" {
return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to fetch deploymentID for %s: value was empty!", v.Name))
pi := PeerSiteInfo{
PeerSite: v,
DeploymentID: deploymentID,
Empty: true,
}
deploymentIDs = append(deploymentIDs, deploymentID)
// deploymentIDs must be unique
if deploymentIDsSet.Contains(deploymentID) {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRDuplicateSites)
}
deploymentIDsSet.Add(deploymentID)
if deploymentID == globalDeploymentID {
selfIdx = i
objAPI := newObjectLayerFn()
if objAPI == nil {
return madmin.ReplicateAddStatus{}, errSRObjectLayerNotReady
return psi, errSRObjectLayerNotReady
}
res, err := objAPI.ListBuckets(ctx)
if err != nil {
return madmin.ReplicateAddStatus{}, errSRBackendIssue(err)
return psi, errSRBackendIssue(err)
}
if len(res) > 0 {
localHasBuckets = true
pi.Empty = false
}
pi.self = true
} else {
s3Client, err := getS3Client(v)
if err != nil {
return psi, errSRPeerResp(fmt.Errorf("unable to create s3 client for %s: %w", v.Name, err))
}
buckets, err := s3Client.ListBuckets(ctx)
if err != nil {
return psi, errSRPeerResp(fmt.Errorf("unable to list buckets for %s: %v", v.Name, err))
}
pi.Empty = len(buckets) == 0
}
psi = append(psi, pi)
}
return
}
// AddPeerClusters - add cluster sites for replication configuration.
func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmin.PeerSite) (madmin.ReplicateAddStatus, SRError) {
sites, serr := c.getSiteStatuses(ctx, psites)
if serr.Cause != nil {
return madmin.ReplicateAddStatus{}, serr
}
var (
currSites madmin.SiteReplicationInfo
currDeploymentIDsSet = set.NewStringSet()
err error
)
if c.enabled {
currSites, err = c.GetClusterInfo(ctx)
if err != nil {
return madmin.ReplicateAddStatus{}, errSRBackendIssue(err)
}
for _, v := range currSites.Sites {
currDeploymentIDsSet.Add(v.DeploymentID)
}
}
deploymentIDsSet := set.NewStringSet()
localHasBuckets := false
nonLocalPeerWithBuckets := ""
var selfIdx = -1
for i, v := range sites {
// deploymentIDs must be unique
if deploymentIDsSet.Contains(v.DeploymentID) {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRDuplicateSites)
}
deploymentIDsSet.Add(v.DeploymentID)
if v.self {
selfIdx = i
localHasBuckets = !v.Empty
continue
}
s3Client, err := getS3Client(v)
if err != nil {
return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to create s3 client for %s: %w", v.Name, err))
}
buckets, err := s3Client.ListBuckets(ctx)
if err != nil {
return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to list buckets for %s: %v", v.Name, err))
}
if len(buckets) > 0 {
if !v.Empty && !currDeploymentIDsSet.Contains(v.DeploymentID) {
nonLocalPeerWithBuckets = v.Name
}
}
if c.enabled {
// If current cluster is already SR enabled and no new site being added ,fail.
if currDeploymentIDsSet.Equals(deploymentIDsSet) {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRCannotJoin)
}
if len(currDeploymentIDsSet.Intersection(deploymentIDsSet)) != len(currDeploymentIDsSet) {
diffSlc := getMissingSiteNames(currDeploymentIDsSet, deploymentIDsSet, currSites.Sites)
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("All existing replicated sites must be specified - missing %s", strings.Join(diffSlc, " ")))
}
}
// For this `add` API, either all clusters must be empty or the local
// cluster must be the only one having some buckets.
@@ -332,7 +372,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, sites []madmin
// validate that all clusters are using the same (LDAP based)
// external IDP.
pass, verr := c.validateIDPSettings(ctx, sites, selfIdx)
pass, verr := c.validateIDPSettings(ctx, sites)
if verr.Cause != nil {
return madmin.ReplicateAddStatus{}, verr
}
@@ -352,44 +392,58 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, sites []madmin
// Create a local service account.
// Generate a secret key for the service account.
// Generate a secret key for the service account if not created already.
var secretKey string
_, secretKey, err := auth.GenerateCredentials()
if err != nil {
return madmin.ReplicateAddStatus{}, SRError{
Cause: err,
Code: ErrInternalError,
svcCred, _, err := globalIAMSys.getServiceAccount(ctx, siteReplicatorSvcAcc)
switch {
case err == errNoSuchServiceAccount:
_, secretKey, err = auth.GenerateCredentials()
if err != nil {
return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err))
}
}
svcCred, err := globalIAMSys.NewServiceAccount(ctx, sites[selfIdx].AccessKey, nil, newServiceAccountOpts{
accessKey: siteReplicatorSvcAcc,
secretKey: secretKey,
})
if err != nil {
return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err))
svcCred, err = globalIAMSys.NewServiceAccount(ctx, sites[selfIdx].AccessKey, nil, newServiceAccountOpts{
accessKey: siteReplicatorSvcAcc,
secretKey: secretKey,
})
if err != nil {
return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err))
}
case err == nil:
secretKey = svcCred.SecretKey
default:
return madmin.ReplicateAddStatus{}, errSRBackendIssue(err)
}
joinReq := madmin.SRInternalJoinReq{
SvcAcctAccessKey: svcCred.AccessKey,
SvcAcctSecretKey: svcCred.SecretKey,
SvcAcctSecretKey: secretKey,
Peers: make(map[string]madmin.PeerInfo),
}
for i, v := range sites {
joinReq.Peers[deploymentIDs[i]] = madmin.PeerInfo{
for _, v := range sites {
joinReq.Peers[v.DeploymentID] = madmin.PeerInfo{
Endpoint: v.Endpoint,
Name: v.Name,
DeploymentID: deploymentIDs[i],
DeploymentID: v.DeploymentID,
}
}
addedCount := 0
var peerAddErr SRError
for i, v := range sites {
if i == selfIdx {
var (
peerAddErr SRError
admClient *madmin.AdminClient
)
for _, v := range sites {
if v.self {
continue
}
admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey)
switch {
case currDeploymentIDsSet.Contains(v.DeploymentID):
admClient, err = c.getAdminClient(ctx, v.DeploymentID)
default:
admClient, err = getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey)
}
if err != nil {
peerAddErr = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
break
@@ -415,9 +469,9 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, sites []madmin
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: peerAddErr.Error(),
}
return partial, SRError{}
}
// Other than handling existing buckets, we can now save the cluster
// replication configuration state.
state := srState{
@@ -448,10 +502,6 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, sites []madmin
// InternalJoinReq - internal API handler to respond to a peer cluster's request
// to join.
func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRInternalJoinReq) SRError {
if c.enabled {
return errSRInvalidRequest(errSRCannotJoin)
}
var ourName string
for d, p := range arg.Peers {
if d == globalDeploymentID {
@@ -463,10 +513,13 @@ func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRI
return errSRInvalidRequest(errSRSelfNotFound)
}
_, err := globalIAMSys.NewServiceAccount(ctx, arg.SvcAcctParent, nil, newServiceAccountOpts{
accessKey: arg.SvcAcctAccessKey,
secretKey: arg.SvcAcctSecretKey,
})
_, _, err := globalIAMSys.GetServiceAccount(ctx, arg.SvcAcctAccessKey)
if err == errNoSuchServiceAccount {
_, err = globalIAMSys.NewServiceAccount(ctx, arg.SvcAcctParent, nil, newServiceAccountOpts{
accessKey: arg.SvcAcctAccessKey,
secretKey: arg.SvcAcctSecretKey,
})
}
if err != nil {
return errSRServiceAccount(fmt.Errorf("unable to create service account on %s: %v", ourName, err))
}
@@ -495,10 +548,10 @@ func (c *SiteReplicationSys) GetIDPSettings(ctx context.Context) madmin.IDPSetti
}
}
func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []madmin.PeerSite, selfIdx int) (bool, SRError) {
func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []PeerSiteInfo) (bool, SRError) {
s := make([]madmin.IDPSettings, 0, len(peers))
for i, v := range peers {
if i == selfIdx {
for _, v := range peers {
if v.self {
s = append(s, c.GetIDPSettings(ctx))
continue
}
@@ -791,25 +844,41 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
return err
}
}
err = replicationConfig.AddRule(replication.Options{
// Set the ID so we can identify the rule as being
// created for site-replication and include the
// destination cluster's deployment ID.
ID: fmt.Sprintf("site-repl-%s", d),
var (
ruleID = fmt.Sprintf("site-repl-%s", d)
hasRule bool
opts = replication.Options{
// Set the ID so we can identify the rule as being
// created for site-replication and include the
// destination cluster's deployment ID.
ID: ruleID,
// Use a helper to generate unique priority numbers.
Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)),
// Use a helper to generate unique priority numbers.
Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)),
Op: replication.AddOption,
RuleStatus: "enable",
DestBucket: targetARN,
Op: replication.AddOption,
RuleStatus: "enable",
DestBucket: targetARN,
// Replicate everything!
ReplicateDeletes: "enable",
ReplicateDeleteMarkers: "enable",
ReplicaSync: "enable",
ExistingObjectReplicate: "enable",
}
)
for _, r := range replicationConfig.Rules {
if r.ID == ruleID {
hasRule = true
}
}
switch {
case hasRule:
err = replicationConfig.EditRule(opts)
default:
err = replicationConfig.AddRule(opts)
}
// Replicate everything!
ReplicateDeletes: "enable",
ReplicateDeleteMarkers: "enable",
ReplicaSync: "enable",
ExistingObjectReplicate: "enable",
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err))
return err
@@ -1404,8 +1473,10 @@ func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) SRError {
if err != nil {
return errSRBackendIssue(err)
}
if _, isLDAPAccount := claims[ldapUserN]; !isLDAPAccount {
continue
if claims != nil {
if _, isLDAPAccount := claims[ldapUserN]; !isLDAPAccount {
continue
}
}
_, policy, err := globalIAMSys.GetServiceAccount(ctx, acc.AccessKey)
if err != nil {
@@ -1572,3 +1643,16 @@ func getPriorityHelper(replicationConfig replication.Config) int {
// leave some gaps in priority numbers for flexibility
return maxPrio + 10
}
// returns a slice with site names participating in site replciation but unspecified while adding
// a new site.
func getMissingSiteNames(oldDeps, newDeps set.StringSet, currSites []madmin.PeerInfo) []string {
diff := oldDeps.Difference(newDeps)
var diffSlc []string
for _, v := range currSites {
if diff.Contains(v.DeploymentID) {
diffSlc = append(diffSlc, v.Name)
}
}
return diffSlc
}