minio/cmd/site-replication.go
Aditya Manthramurthy 9aadd725d2
Avoid calling .Reset() on active timer (#14941)
.Reset() documentation states:

    For a Timer created with NewTimer, Reset should be invoked only on stopped
    or expired timers with drained channels.

This change is just to comply with this requirement as there might be some
runtime dependent situation that might lead to unexpected behavior.
2022-05-18 15:37:58 -07:00

4426 lines
125 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"net/url"
"reflect"
"sort"
"strings"
"sync"
"time"
"github.com/minio/madmin-go"
minioClient "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/replication"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/auth"
sreplication "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/bucket/policy"
bktpolicy "github.com/minio/pkg/bucket/policy"
iampolicy "github.com/minio/pkg/iam/policy"
)
const (
srStatePrefix = minioConfigPrefix + "/site-replication"
srStateFile = "state.json"
)
const (
srStateFormatVersion1 = 1
)
var (
errSRCannotJoin = SRError{
Cause: errors.New("this site is already configured for site-replication"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRDuplicateSites = SRError{
Cause: errors.New("duplicate sites provided for site-replication"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRSelfNotFound = SRError{
Cause: errors.New("none of the given sites correspond to the current one"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRPeerNotFound = SRError{
Cause: errors.New("peer not found"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRNotEnabled = SRError{
Cause: errors.New("site replication is not enabled"),
Code: ErrSiteReplicationInvalidRequest,
}
)
func errSRInvalidRequest(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationInvalidRequest,
}
}
func errSRPeerResp(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationPeerResp,
}
}
func errSRBackendIssue(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationBackendIssue,
}
}
func errSRServiceAccount(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationServiceAccountError,
}
}
func errSRBucketConfigError(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationBucketConfigError,
}
}
func errSRBucketMetaError(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationBucketMetaError,
}
}
func errSRIAMError(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationIAMError,
}
}
func errSRConfigMissingError(err error) SRError {
return SRError{
Cause: err,
Code: ErrSiteReplicationConfigMissing,
}
}
var errSRObjectLayerNotReady = SRError{
Cause: fmt.Errorf("object layer not ready"),
Code: ErrServerNotInitialized,
}
func getSRStateFilePath() string {
return srStatePrefix + SlashSeparator + srStateFile
}
// SRError - wrapped error for site replication.
type SRError struct {
Cause error
Code APIErrorCode
}
func (c SRError) Error() string {
return c.Cause.Error()
}
func wrapSRErr(err error) SRError {
return SRError{Cause: err, Code: ErrInternalError}
}
// SiteReplicationSys - manages cluster-level replication.
type SiteReplicationSys struct {
sync.RWMutex
enabled bool
// In-memory and persisted multi-site replication state.
state srState
}
type srState srStateV1
// srStateV1 represents version 1 of the site replication state persistence
// format.
type srStateV1 struct {
Name string `json:"name"`
// Peers maps peers by their deploymentID
Peers map[string]madmin.PeerInfo `json:"peers"`
ServiceAccountAccessKey string `json:"serviceAccountAccessKey"`
}
// srStateData represents the format of the current `srStateFile`.
type srStateData struct {
Version int `json:"version"`
SRState srStateV1 `json:"srState"`
}
// Init - initialize the site replication manager.
func (c *SiteReplicationSys) Init(ctx context.Context, objAPI ObjectLayer) error {
go c.startHealRoutine(ctx, objAPI)
err := c.loadFromDisk(ctx, objAPI)
if err == errConfigNotFound {
return nil
}
c.RLock()
defer c.RUnlock()
if c.enabled {
logger.Info("Cluster replication initialized")
}
return err
}
func (c *SiteReplicationSys) loadFromDisk(ctx context.Context, objAPI ObjectLayer) error {
buf, err := readConfig(ctx, objAPI, getSRStateFilePath())
if err != nil {
return err
}
// attempt to read just the version key in the state file to ensure we
// are reading a compatible version.
var ver struct {
Version int `json:"version"`
}
err = json.Unmarshal(buf, &ver)
if err != nil {
return err
}
if ver.Version != srStateFormatVersion1 {
return fmt.Errorf("Unexpected ClusterRepl state version: %d", ver.Version)
}
var sdata srStateData
err = json.Unmarshal(buf, &sdata)
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.state = srState(sdata.SRState)
c.enabled = true
return nil
}
func (c *SiteReplicationSys) saveToDisk(ctx context.Context, state srState) error {
sdata := srStateData{
Version: srStateFormatVersion1,
SRState: srStateV1(state),
}
buf, err := json.Marshal(sdata)
if err != nil {
return err
}
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
err = saveConfig(ctx, objAPI, getSRStateFilePath(), buf)
if err != nil {
return err
}
for _, e := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) {
logger.LogIf(ctx, e)
}
c.Lock()
defer c.Unlock()
c.state = state
c.enabled = len(c.state.Peers) != 0
return nil
}
const (
// Access key of service account used for perform cluster-replication
// operations.
siteReplicatorSvcAcc = "site-replicator-0"
)
// PeerSiteInfo is a wrapper struct around madmin.PeerSite with extra info on site status
type PeerSiteInfo struct {
madmin.PeerSite
self bool
DeploymentID string
Replicated bool // true if already participating in site replication
Empty bool // true if cluster has no buckets
}
// getSiteStatuses gathers more info on the sites being added
func (c *SiteReplicationSys) getSiteStatuses(ctx context.Context, sites ...madmin.PeerSite) (psi []PeerSiteInfo, err error) {
psi = make([]PeerSiteInfo, 0, len(sites))
for _, v := range sites {
admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey)
if err != nil {
return psi, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
}
info, err := admClient.ServerInfo(ctx)
if err != nil {
return psi, errSRPeerResp(fmt.Errorf("unable to fetch server info for %s: %w", v.Name, err))
}
s3Client, err := getS3Client(v)
if err != nil {
return psi, errSRPeerResp(fmt.Errorf("unable to create s3 client for %s: %w", v.Name, err))
}
buckets, err := s3Client.ListBuckets(ctx)
if err != nil {
return psi, errSRPeerResp(fmt.Errorf("unable to list buckets for %s: %v", v.Name, err))
}
psi = append(psi, PeerSiteInfo{
PeerSite: v,
DeploymentID: info.DeploymentID,
Empty: len(buckets) == 0,
self: info.DeploymentID == globalDeploymentID,
})
}
return
}
// AddPeerClusters - add cluster sites for replication configuration.
func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmin.PeerSite) (madmin.ReplicateAddStatus, error) {
sites, serr := c.getSiteStatuses(ctx, psites...)
if serr != nil {
return madmin.ReplicateAddStatus{}, serr
}
var (
currSites madmin.SiteReplicationInfo
currDeploymentIDsSet = set.NewStringSet()
err error
)
currSites, err = c.GetClusterInfo(ctx)
if err != nil {
return madmin.ReplicateAddStatus{}, errSRBackendIssue(err)
}
for _, v := range currSites.Sites {
currDeploymentIDsSet.Add(v.DeploymentID)
}
deploymentIDsSet := set.NewStringSet()
localHasBuckets := false
nonLocalPeerWithBuckets := ""
selfIdx := -1
for i, v := range sites {
// deploymentIDs must be unique
if deploymentIDsSet.Contains(v.DeploymentID) {
return madmin.ReplicateAddStatus{}, errSRDuplicateSites
}
deploymentIDsSet.Add(v.DeploymentID)
if v.self {
selfIdx = i
localHasBuckets = !v.Empty
continue
}
if !v.Empty && !currDeploymentIDsSet.Contains(v.DeploymentID) {
nonLocalPeerWithBuckets = v.Name
}
}
if !currDeploymentIDsSet.IsEmpty() {
// If current cluster is already SR enabled and no new site being added ,fail.
if currDeploymentIDsSet.Equals(deploymentIDsSet) {
return madmin.ReplicateAddStatus{}, errSRCannotJoin
}
if len(currDeploymentIDsSet.Intersection(deploymentIDsSet)) != len(currDeploymentIDsSet) {
diffSlc := getMissingSiteNames(currDeploymentIDsSet, deploymentIDsSet, currSites.Sites)
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("all existing replicated sites must be specified - missing %s", strings.Join(diffSlc, " ")))
}
}
// validate that all clusters are using the same IDP settings.
pass, err := c.validateIDPSettings(ctx, sites)
if err != nil {
return madmin.ReplicateAddStatus{}, err
}
if !pass {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("all cluster sites must have the same IAM/IDP settings"))
}
// For this `add` API, either all clusters must be empty or the local
// cluster must be the only one having some buckets.
if localHasBuckets && nonLocalPeerWithBuckets != "" {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("only one cluster may have data when configuring site replication"))
}
if !localHasBuckets && nonLocalPeerWithBuckets != "" {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("please send your request to the cluster containing data/buckets: %s", nonLocalPeerWithBuckets))
}
// FIXME: Ideally, we also need to check if there are any global IAM
// policies and any (LDAP user created) service accounts on the other
// peer clusters, and if so, reject the cluster replicate add request.
// This is not yet implemented.
// VALIDATIONS COMPLETE.
// Create a common service account for all clusters, with root
// permissions.
// Create a local service account.
// Generate a secret key for the service account if not created already.
var secretKey string
svcCred, _, err := globalIAMSys.getServiceAccount(ctx, siteReplicatorSvcAcc)
switch {
case err == errNoSuchServiceAccount:
_, secretKey, err = auth.GenerateCredentials()
if err != nil {
return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err))
}
svcCred, err = globalIAMSys.NewServiceAccount(ctx, sites[selfIdx].AccessKey, nil, newServiceAccountOpts{
accessKey: siteReplicatorSvcAcc,
secretKey: secretKey,
})
if err != nil {
return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err))
}
case err == nil:
secretKey = svcCred.SecretKey
default:
return madmin.ReplicateAddStatus{}, errSRBackendIssue(err)
}
joinReq := madmin.SRPeerJoinReq{
SvcAcctAccessKey: svcCred.AccessKey,
SvcAcctSecretKey: secretKey,
Peers: make(map[string]madmin.PeerInfo),
}
for _, v := range sites {
joinReq.Peers[v.DeploymentID] = madmin.PeerInfo{
Endpoint: v.Endpoint,
Name: v.Name,
DeploymentID: v.DeploymentID,
}
}
addedCount := 0
var (
peerAddErr error
admClient *madmin.AdminClient
)
for _, v := range sites {
if v.self {
continue
}
switch {
case currDeploymentIDsSet.Contains(v.DeploymentID):
admClient, err = c.getAdminClient(ctx, v.DeploymentID)
default:
admClient, err = getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey)
}
if err != nil {
peerAddErr = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
break
}
joinReq.SvcAcctParent = v.AccessKey
err = admClient.SRPeerJoin(ctx, joinReq)
if err != nil {
peerAddErr = errSRPeerResp(fmt.Errorf("unable to link with peer %s: %w", v.Name, err))
break
}
addedCount++
}
if peerAddErr != nil {
if addedCount == 0 {
return madmin.ReplicateAddStatus{}, peerAddErr
}
// In this case, it means at least one cluster was added
// successfully, we need to send a response to the client with
// some details - FIXME: the disks on this cluster would need to
// be cleaned to recover.
partial := madmin.ReplicateAddStatus{
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: peerAddErr.Error(),
}
return partial, nil
}
// Other than handling existing buckets, we can now save the cluster
// replication configuration state.
state := srState{
Name: sites[selfIdx].Name,
Peers: joinReq.Peers,
ServiceAccountAccessKey: svcCred.AccessKey,
}
if err = c.saveToDisk(ctx, state); err != nil {
return madmin.ReplicateAddStatus{
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err),
}, nil
}
result := madmin.ReplicateAddStatus{
Success: true,
Status: madmin.ReplicateAddStatusSuccess,
}
if err := c.syncToAllPeers(ctx); err != nil {
result.InitialSyncErrorMessage = err.Error()
}
return result, nil
}
// PeerJoinReq - internal API handler to respond to a peer cluster's request
// to join.
func (c *SiteReplicationSys) PeerJoinReq(ctx context.Context, arg madmin.SRPeerJoinReq) error {
var ourName string
for d, p := range arg.Peers {
if d == globalDeploymentID {
ourName = p.Name
break
}
}
if ourName == "" {
return errSRSelfNotFound
}
_, _, err := globalIAMSys.GetServiceAccount(ctx, arg.SvcAcctAccessKey)
if err == errNoSuchServiceAccount {
_, err = globalIAMSys.NewServiceAccount(ctx, arg.SvcAcctParent, nil, newServiceAccountOpts{
accessKey: arg.SvcAcctAccessKey,
secretKey: arg.SvcAcctSecretKey,
})
}
if err != nil {
return errSRServiceAccount(fmt.Errorf("unable to create service account on %s: %v", ourName, err))
}
state := srState{
Name: ourName,
Peers: arg.Peers,
ServiceAccountAccessKey: arg.SvcAcctAccessKey,
}
if err = c.saveToDisk(ctx, state); err != nil {
return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err))
}
return nil
}
// GetIDPSettings returns info about the configured identity provider. It is
// used to validate that all peers have the same IDP.
func (c *SiteReplicationSys) GetIDPSettings(ctx context.Context) madmin.IDPSettings {
s := madmin.IDPSettings{}
s.LDAP = madmin.LDAPSettings{
IsLDAPEnabled: globalLDAPConfig.Enabled,
LDAPUserDNSearchBase: globalLDAPConfig.UserDNSearchBaseDistName,
LDAPUserDNSearchFilter: globalLDAPConfig.UserDNSearchFilter,
LDAPGroupSearchBase: globalLDAPConfig.GroupSearchBaseDistName,
LDAPGroupSearchFilter: globalLDAPConfig.GroupSearchFilter,
}
s.OpenID = globalOpenIDConfig.GetSettings()
if s.OpenID.Enabled {
s.OpenID.Region = globalSite.Region
}
return s
}
func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []PeerSiteInfo) (bool, error) {
s := make([]madmin.IDPSettings, 0, len(peers))
for _, v := range peers {
if v.self {
s = append(s, c.GetIDPSettings(ctx))
continue
}
admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey)
if err != nil {
return false, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
}
is, err := admClient.SRPeerGetIDPSettings(ctx)
if err != nil {
return false, errSRPeerResp(fmt.Errorf("unable to fetch IDP settings from %s: %v", v.Name, err))
}
s = append(s, is)
}
for i := 1; i < len(s); i++ {
if !reflect.DeepEqual(s[i], s[0]) {
return false, nil
}
}
return true, nil
}
// GetClusterInfo - returns site replication information.
func (c *SiteReplicationSys) GetClusterInfo(ctx context.Context) (info madmin.SiteReplicationInfo, err error) {
c.RLock()
defer c.RUnlock()
if !c.enabled {
return info, nil
}
info.Enabled = true
info.Name = c.state.Name
info.Sites = make([]madmin.PeerInfo, 0, len(c.state.Peers))
for _, peer := range c.state.Peers {
info.Sites = append(info.Sites, peer)
}
sort.SliceStable(info.Sites, func(i, j int) bool {
return info.Sites[i].Name < info.Sites[j].Name
})
info.ServiceAccountAccessKey = c.state.ServiceAccountAccessKey
return info, nil
}
// MakeBucketHook - called during a regular make bucket call when cluster
// replication is enabled. It is responsible for the creation of the same bucket
// on remote clusters, and creating replication rules on local and peer
// clusters.
func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string, opts BucketOptions) error {
// At this point, the local bucket is created.
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
optsMap := make(map[string]string)
if opts.Location != "" {
optsMap["location"] = opts.Location
}
if opts.LockEnabled {
optsMap["lockEnabled"] = "true"
optsMap["versioningEnabled"] = "true"
}
if opts.VersioningEnabled {
optsMap["versioningEnabled"] = "true"
}
if opts.ForceCreate {
optsMap["forceCreate"] = "true"
}
// Create bucket and enable versioning on all peers.
makeBucketConcErr := c.concDo(
func() error {
err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)
logger.LogIf(ctx, c.annotateErr("MakeWithVersioning", err))
return err
},
func(deploymentID string, p madmin.PeerInfo) error {
admClient, err := c.getAdminClient(ctx, deploymentID)
if err != nil {
return err
}
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap)
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "MakeWithVersioning", err))
return err
},
)
// If all make-bucket-and-enable-versioning operations failed, nothing
// more to do.
if makeBucketConcErr.allFailed() {
return makeBucketConcErr
}
// Log any errors in make-bucket operations.
logger.LogIf(ctx, makeBucketConcErr.summaryErr)
// Create bucket remotes and add replication rules for the bucket on
// self and peers.
makeRemotesConcErr := c.concDo(
func() error {
err := c.PeerBucketConfigureReplHandler(ctx, bucket)
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err))
return err
},
func(deploymentID string, p madmin.PeerInfo) error {
admClient, err := c.getAdminClient(ctx, deploymentID)
if err != nil {
return err
}
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil)
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "ConfigureRepl", err))
return err
},
)
err := makeRemotesConcErr.summaryErr
if err != nil {
return err
}
return nil
}
// DeleteBucketHook - called during a regular delete bucket call when cluster
// replication is enabled. It is responsible for the deletion of the same bucket
// on remote clusters.
func (c *SiteReplicationSys) DeleteBucketHook(ctx context.Context, bucket string, forceDelete bool) error {
// At this point, the local bucket is deleted.
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
op := madmin.DeleteBucketBktOp
if forceDelete {
op = madmin.ForceDeleteBucketBktOp
}
// Send bucket delete to other clusters.
cErr := c.concDo(nil, func(deploymentID string, p madmin.PeerInfo) error {
admClient, err := c.getAdminClient(ctx, deploymentID)
if err != nil {
return wrapSRErr(err)
}
err = admClient.SRPeerBucketOps(ctx, bucket, op, nil)
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "DeleteBucket", err))
return err
})
return cErr.summaryErr
}
// PeerBucketMakeWithVersioningHandler - creates bucket and enables versioning.
func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Context, bucket string, opts BucketOptions) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
err := objAPI.MakeBucketWithLocation(ctx, bucket, opts)
if err != nil {
// Check if this is a bucket exists error.
_, ok1 := err.(BucketExists)
_, ok2 := err.(BucketAlreadyExists)
if !ok1 && !ok2 {
logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err))
return wrapSRErr(err)
}
} else {
// Load updated bucket metadata into memory as new
// bucket was created.
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
}
meta, err := globalBucketMetadataSys.Get(bucket)
if err != nil {
logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err))
return wrapSRErr(err)
}
meta.VersioningConfigXML = enabledBucketVersioningConfig
if opts.LockEnabled {
meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
}
if err := meta.Save(context.Background(), objAPI); err != nil {
return wrapSRErr(err)
}
globalBucketMetadataSys.Set(bucket, meta)
// Load updated bucket metadata into memory as new metadata updated.
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
return nil
}
// PeerBucketConfigureReplHandler - configures replication remote and
// replication rules to all other peers for the local bucket.
func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, bucket string) error {
creds, err := c.getPeerCreds()
if err != nil {
return wrapSRErr(err)
}
// The following function, creates a bucket remote and sets up a bucket
// replication rule for the given peer.
configurePeerFn := func(d string, peer madmin.PeerInfo) error {
ep, _ := url.Parse(peer.Endpoint)
targets := globalBucketTargetSys.ListTargets(ctx, bucket, string(madmin.ReplicationService))
targetARN := ""
for _, target := range targets {
if target.SourceBucket == bucket &&
target.TargetBucket == bucket &&
target.Endpoint == ep.Host &&
target.Secure == (ep.Scheme == "https") &&
target.Type == madmin.ReplicationService {
targetARN = target.Arn
break
}
}
if targetARN == "" {
bucketTarget := madmin.BucketTarget{
SourceBucket: bucket,
Endpoint: ep.Host,
Credentials: &madmin.Credentials{
AccessKey: creds.AccessKey,
SecretKey: creds.SecretKey,
},
TargetBucket: bucket,
Secure: ep.Scheme == "https",
API: "s3v4",
Type: madmin.ReplicationService,
Region: "",
ReplicationSync: false,
}
bucketTarget.Arn = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget)
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false)
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err))
return err
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
return err
}
tgtBytes, err := json.Marshal(&targets)
if err != nil {
return err
}
if err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
return err
}
targetARN = bucketTarget.Arn
}
// Create bucket replication rule to this peer.
// To add the bucket replication rule, we fetch the current
// server configuration, and convert it to minio-go's
// replication configuration type (by converting to xml and
// parsing it back), use minio-go's add rule function, and
// finally convert it back to the server type (again via xml).
// This is needed as there is no add-rule function in the server
// yet.
// Though we do not check if the rule already exists, this is
// not a problem as we are always using the same replication
// rule ID - if the rule already exists, it is just replaced.
replicationConfigS, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
if err != nil {
_, ok := err.(BucketReplicationConfigNotFound)
if !ok {
return err
}
}
var replicationConfig replication.Config
if replicationConfigS != nil {
replCfgSBytes, err := xml.Marshal(replicationConfigS)
if err != nil {
return err
}
err = xml.Unmarshal(replCfgSBytes, &replicationConfig)
if err != nil {
return err
}
}
var (
ruleID = fmt.Sprintf("site-repl-%s", d)
hasRule bool
opts = replication.Options{
// Set the ID so we can identify the rule as being
// created for site-replication and include the
// destination cluster's deployment ID.
ID: ruleID,
// Use a helper to generate unique priority numbers.
Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)),
Op: replication.AddOption,
RuleStatus: "enable",
DestBucket: targetARN,
// Replicate everything!
ReplicateDeletes: "enable",
ReplicateDeleteMarkers: "enable",
ReplicaSync: "enable",
ExistingObjectReplicate: "enable",
}
)
for _, r := range replicationConfig.Rules {
if r.ID == ruleID {
hasRule = true
}
}
switch {
case hasRule:
err = replicationConfig.EditRule(opts)
default:
err = replicationConfig.AddRule(opts)
}
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err))
return err
}
// Now convert the configuration back to server's type so we can
// do some validation.
newReplCfgBytes, err := xml.Marshal(replicationConfig)
if err != nil {
return err
}
newReplicationConfig, err := sreplication.ParseConfig(bytes.NewReader(newReplCfgBytes))
if err != nil {
return err
}
sameTarget, apiErr := validateReplicationDestination(ctx, bucket, newReplicationConfig)
if apiErr != noError {
return fmt.Errorf("bucket replication config validation error: %#v", apiErr)
}
err = newReplicationConfig.Validate(bucket, sameTarget)
if err != nil {
return err
}
// Config looks good, so we save it.
replCfgData, err := xml.Marshal(newReplicationConfig)
if err != nil {
return err
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, replCfgData)
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error updating replication configuration", err))
return err
}
c.RLock()
defer c.RUnlock()
errMap := make(map[string]error, len(c.state.Peers))
for d, peer := range c.state.Peers {
if d == globalDeploymentID {
continue
}
if err := configurePeerFn(d, peer); err != nil {
errMap[d] = err
}
}
return c.toErrorFromErrMap(errMap)
}
// PeerBucketDeleteHandler - deletes bucket on local in response to a delete
// bucket request from a peer.
func (c *SiteReplicationSys) PeerBucketDeleteHandler(ctx context.Context, bucket string, forceDelete bool) error {
c.RLock()
defer c.RUnlock()
if !c.enabled {
return errSRNotEnabled
}
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
if globalDNSConfig != nil {
if err := globalDNSConfig.Delete(bucket); err != nil {
return err
}
}
err := objAPI.DeleteBucket(ctx, bucket, DeleteBucketOptions{Force: forceDelete})
if err != nil {
if globalDNSConfig != nil {
if err2 := globalDNSConfig.Put(bucket); err2 != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, please fix it manually", err2))
}
}
return err
}
globalNotificationSys.DeleteBucketMetadata(ctx, bucket)
return nil
}
// IAMChangeHook - called when IAM items need to be replicated to peer clusters.
// This includes named policy creation, policy mapping changes and service
// account changes.
//
// All policies are replicated.
//
// Policy mappings are only replicated when they are for LDAP users or groups
// (as an external IDP is always assumed when SR is used). In the case of
// OpenID, such mappings are provided from the IDP directly and so are not
// applicable here.
//
// Service accounts are replicated as long as they are not meant for the root
// user.
//
// STS accounts are replicated, but only if the session token is verifiable
// using the local cluster's root credential.
func (c *SiteReplicationSys) IAMChangeHook(ctx context.Context, item madmin.SRIAMItem) error {
// The IAM item has already been applied to the local cluster at this
// point, and only needs to be updated on all remote peer clusters.
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error {
admClient, err := c.getAdminClient(ctx, d)
if err != nil {
return wrapSRErr(err)
}
err = admClient.SRPeerReplicateIAMItem(ctx, item)
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRPeerReplicateIAMItem", err))
return err
})
return cErr.summaryErr
}
// PeerAddPolicyHandler - copies IAM policy to local. A nil policy argument,
// causes the named policy to be deleted.
func (c *SiteReplicationSys) PeerAddPolicyHandler(ctx context.Context, policyName string, p *iampolicy.Policy) error {
var err error
if p == nil {
err = globalIAMSys.DeletePolicy(ctx, policyName, true)
} else {
err = globalIAMSys.SetPolicy(ctx, policyName, *p)
}
if err != nil {
return wrapSRErr(err)
}
return nil
}
// PeerIAMUserChangeHandler - copies IAM user to local.
func (c *SiteReplicationSys) PeerIAMUserChangeHandler(ctx context.Context, change *madmin.SRIAMUser) error {
if change == nil {
return errSRInvalidRequest(errInvalidArgument)
}
var err error
if change.IsDeleteReq {
err = globalIAMSys.DeleteUser(ctx, change.AccessKey, true)
} else {
if change.UserReq == nil {
return errSRInvalidRequest(errInvalidArgument)
}
userReq := *change.UserReq
if userReq.Status != "" && userReq.SecretKey == "" {
// Status is set without secretKey updates means we are
// only changing the account status.
err = globalIAMSys.SetUserStatus(ctx, change.AccessKey, userReq.Status)
} else {
err = globalIAMSys.CreateUser(ctx, change.AccessKey, userReq)
}
}
if err != nil {
return wrapSRErr(err)
}
return nil
}
// PeerGroupInfoChangeHandler - copies group changes to local.
func (c *SiteReplicationSys) PeerGroupInfoChangeHandler(ctx context.Context, change *madmin.SRGroupInfo) error {
if change == nil {
return errSRInvalidRequest(errInvalidArgument)
}
updReq := change.UpdateReq
var err error
if updReq.IsRemove {
err = globalIAMSys.RemoveUsersFromGroup(ctx, updReq.Group, updReq.Members)
} else {
if updReq.Status != "" && len(updReq.Members) == 0 {
err = globalIAMSys.SetGroupStatus(ctx, updReq.Group, updReq.Status == madmin.GroupEnabled)
} else {
err = globalIAMSys.AddUsersToGroup(ctx, updReq.Group, updReq.Members)
}
}
if err != nil {
return wrapSRErr(err)
}
return nil
}
// PeerSvcAccChangeHandler - copies service-account change to local.
func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change *madmin.SRSvcAccChange) error {
if change == nil {
return errSRInvalidRequest(errInvalidArgument)
}
switch {
case change.Create != nil:
var sp *iampolicy.Policy
var err error
if len(change.Create.SessionPolicy) > 0 {
sp, err = iampolicy.ParseConfig(bytes.NewReader(change.Create.SessionPolicy))
if err != nil {
return wrapSRErr(err)
}
}
opts := newServiceAccountOpts{
accessKey: change.Create.AccessKey,
secretKey: change.Create.SecretKey,
sessionPolicy: sp,
claims: change.Create.Claims,
}
_, err = globalIAMSys.NewServiceAccount(ctx, change.Create.Parent, change.Create.Groups, opts)
if err != nil {
return wrapSRErr(err)
}
case change.Update != nil:
var sp *iampolicy.Policy
var err error
if len(change.Update.SessionPolicy) > 0 {
sp, err = iampolicy.ParseConfig(bytes.NewReader(change.Update.SessionPolicy))
if err != nil {
return wrapSRErr(err)
}
}
opts := updateServiceAccountOpts{
secretKey: change.Update.SecretKey,
status: change.Update.Status,
sessionPolicy: sp,
}
err = globalIAMSys.UpdateServiceAccount(ctx, change.Update.AccessKey, opts)
if err != nil {
return wrapSRErr(err)
}
case change.Delete != nil:
err := globalIAMSys.DeleteServiceAccount(ctx, change.Delete.AccessKey, true)
if err != nil {
return wrapSRErr(err)
}
}
return nil
}
// PeerPolicyMappingHandler - copies policy mapping to local.
func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mapping *madmin.SRPolicyMapping) error {
if mapping == nil {
return errSRInvalidRequest(errInvalidArgument)
}
err := globalIAMSys.PolicyDBSet(ctx, mapping.UserOrGroup, mapping.Policy, mapping.IsGroup)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// PeerSTSAccHandler - replicates STS credential locally.
func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *madmin.SRSTSCredential) error {
if stsCred == nil {
return errSRInvalidRequest(errInvalidArgument)
}
// Verify the session token of the stsCred
claims, err := auth.ExtractClaims(stsCred.SessionToken, globalActiveCred.SecretKey)
if err != nil {
logger.LogIf(ctx, err)
return fmt.Errorf("STS credential could not be verified")
}
mapClaims := claims.Map()
expiry, err := auth.ExpToInt64(mapClaims["exp"])
if err != nil {
return fmt.Errorf("Expiry claim was not found: %v", mapClaims)
}
cred := auth.Credentials{
AccessKey: stsCred.AccessKey,
SecretKey: stsCred.SecretKey,
Expiration: time.Unix(expiry, 0).UTC(),
SessionToken: stsCred.SessionToken,
ParentUser: stsCred.ParentUser,
Status: auth.AccountOn,
}
// Extract the username and lookup DN and groups in LDAP.
ldapUser, isLDAPSTS := claims.Lookup(ldapUserN)
switch {
case isLDAPSTS:
// Need to lookup the groups from LDAP.
_, ldapGroups, err := globalLDAPConfig.LookupUserDN(ldapUser)
if err != nil {
return fmt.Errorf("unable to query LDAP server for %s: %v", ldapUser, err)
}
cred.Groups = ldapGroups
}
// Set these credentials to IAM.
if err := globalIAMSys.SetTempUser(ctx, cred.AccessKey, cred, stsCred.ParentPolicyMapping); err != nil {
return fmt.Errorf("unable to save STS credential and/or parent policy mapping: %v", err)
}
return nil
}
// BucketMetaHook - called when bucket meta changes happen and need to be
// replicated to peer clusters.
func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRBucketMeta) error {
// The change has already been applied to the local cluster at this
// point, and only needs to be updated on all remote peer clusters.
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error {
admClient, err := c.getAdminClient(ctx, d)
if err != nil {
return wrapSRErr(err)
}
err = admClient.SRPeerReplicateBucketMeta(ctx, item)
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRPeerReplicateBucketMeta", err))
return err
})
return cErr.summaryErr
}
// PeerBucketVersioningHandler - updates versioning config to local cluster.
func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bucket string, versioning *string) error {
if versioning != nil {
configData, err := base64.StdEncoding.DecodeString(*versioning)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData)
if err != nil {
return wrapSRErr(err)
}
return nil
}
return nil
}
// PeerBucketPolicyHandler - copies/deletes policy to local cluster.
func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *policy.Policy) error {
if policy != nil {
configData, err := json.Marshal(policy)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, configData)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// Delete the bucket policy
err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, nil)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// PeerBucketTaggingHandler - copies/deletes tags to local cluster.
func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucket string, tags *string) error {
if tags != nil {
configData, err := base64.StdEncoding.DecodeString(*tags)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, configData)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// Delete the tags
err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, nil)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// PeerBucketObjectLockConfigHandler - sets object lock on local bucket.
func (c *SiteReplicationSys) PeerBucketObjectLockConfigHandler(ctx context.Context, bucket string, objectLockData *string) error {
if objectLockData != nil {
configData, err := base64.StdEncoding.DecodeString(*objectLockData)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, configData)
if err != nil {
return wrapSRErr(err)
}
return nil
}
return nil
}
// PeerBucketSSEConfigHandler - copies/deletes SSE config to local cluster.
func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, bucket string, sseConfig *string) error {
if sseConfig != nil {
configData, err := base64.StdEncoding.DecodeString(*sseConfig)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, configData)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// Delete sse config
err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, nil)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// PeerBucketQuotaConfigHandler - copies/deletes policy to local cluster.
func (c *SiteReplicationSys) PeerBucketQuotaConfigHandler(ctx context.Context, bucket string, quota *madmin.BucketQuota) error {
if quota != nil {
quotaData, err := json.Marshal(quota)
if err != nil {
return wrapSRErr(err)
}
if err = globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, quotaData); err != nil {
return wrapSRErr(err)
}
return nil
}
// Delete the bucket policy
err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, nil)
if err != nil {
return wrapSRErr(err)
}
return nil
}
// getAdminClient - NOTE: ensure to take at least a read lock on SiteReplicationSys
// before calling this.
func (c *SiteReplicationSys) getAdminClient(ctx context.Context, deploymentID string) (*madmin.AdminClient, error) {
creds, err := c.getPeerCreds()
if err != nil {
return nil, err
}
peer, ok := c.state.Peers[deploymentID]
if !ok {
return nil, errSRPeerNotFound
}
return getAdminClient(peer.Endpoint, creds.AccessKey, creds.SecretKey)
}
// getAdminClientWithEndpoint - NOTE: ensure to take at least a read lock on SiteReplicationSys
// before calling this.
func (c *SiteReplicationSys) getAdminClientWithEndpoint(ctx context.Context, deploymentID, endpoint string) (*madmin.AdminClient, error) {
creds, err := c.getPeerCreds()
if err != nil {
return nil, err
}
if _, ok := c.state.Peers[deploymentID]; !ok {
return nil, errSRPeerNotFound
}
return getAdminClient(endpoint, creds.AccessKey, creds.SecretKey)
}
func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) {
creds, ok := globalIAMSys.store.GetUser(c.state.ServiceAccountAccessKey)
if !ok {
return nil, errors.New("site replication service account not found")
}
return &creds, nil
}
// listBuckets returns a consistent common view of latest unique buckets across
// sites, this is used for replication.
func (c *SiteReplicationSys) listBuckets(ctx context.Context) ([]BucketInfo, error) {
// If local has buckets, enable versioning on them, create them on peers
// and setup replication rules.
objAPI := newObjectLayerFn()
if objAPI == nil {
return nil, errSRObjectLayerNotReady
}
return objAPI.ListBuckets(ctx)
}
// syncToAllPeers is used for syncing local data to all remote peers, it is
// called once during initial "AddPeerClusters" request.
func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
buckets, err := c.listBuckets(ctx)
if err != nil {
return err
}
for _, bucketInfo := range buckets {
bucket := bucketInfo.Name
// MinIO does not store bucket location - so we just check if
// object locking is enabled.
lockConfig, _, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
if err != nil {
if _, ok := err.(BucketObjectLockConfigNotFound); !ok {
return errSRBackendIssue(err)
}
}
var opts BucketOptions
if lockConfig != nil {
opts.LockEnabled = lockConfig.ObjectLockEnabled == "Enabled"
}
// Now call the MakeBucketHook on existing bucket - this will
// create buckets and replication rules on peer clusters.
err = c.MakeBucketHook(ctx, bucket, opts)
if err != nil {
return errSRBucketConfigError(err)
}
// Replicate bucket policy if present.
policy, err := globalPolicySys.Get(bucket)
found := true
if _, ok := err.(BucketPolicyNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
policyJSON, err := json.Marshal(policy)
if err != nil {
return wrapSRErr(err)
}
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
Policy: policyJSON,
})
if err != nil {
return errSRBucketMetaError(err)
}
}
// Replicate bucket tags if present.
tags, _, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
found = true
if _, ok := err.(BucketTaggingNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
tagCfg, err := xml.Marshal(tags)
if err != nil {
return wrapSRErr(err)
}
tagCfgStr := base64.StdEncoding.EncodeToString(tagCfg)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeTags,
Bucket: bucket,
Tags: &tagCfgStr,
})
if err != nil {
return errSRBucketMetaError(err)
}
}
// Replicate object-lock config if present.
objLockCfg, _, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
found = true
if _, ok := err.(BucketObjectLockConfigNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
objLockCfgData, err := xml.Marshal(objLockCfg)
if err != nil {
return wrapSRErr(err)
}
objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeObjectLockConfig,
Bucket: bucket,
Tags: &objLockStr,
})
if err != nil {
return errSRBucketMetaError(err)
}
}
// Replicate existing bucket bucket encryption settings
sseConfig, _, err := globalBucketMetadataSys.GetSSEConfig(bucket)
found = true
if _, ok := err.(BucketSSEConfigNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
sseConfigData, err := xml.Marshal(sseConfig)
if err != nil {
return wrapSRErr(err)
}
sseConfigStr := base64.StdEncoding.EncodeToString(sseConfigData)
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeSSEConfig,
Bucket: bucket,
SSEConfig: &sseConfigStr,
})
if err != nil {
return errSRBucketMetaError(err)
}
}
quotaConfig, _, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket)
found = true
if _, ok := err.(BucketQuotaConfigNotFound); ok {
found = false
} else if err != nil {
return errSRBackendIssue(err)
}
if found {
quotaConfigJSON, err := json.Marshal(quotaConfig)
if err != nil {
return wrapSRErr(err)
}
err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
Quota: quotaConfigJSON,
})
if err != nil {
return errSRBucketMetaError(err)
}
}
}
// Order matters from now on how the information is
// synced to remote sites.
// Policies should be synced first.
{
// Replicate IAM policies on local to all peers.
allPolicies, err := globalIAMSys.ListPolicies(ctx, "")
if err != nil {
return errSRBackendIssue(err)
}
for pname, policy := range allPolicies {
policyJSON, err := json.Marshal(policy)
if err != nil {
return wrapSRErr(err)
}
err = c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicy,
Name: pname,
Policy: policyJSON,
})
if err != nil {
return errSRIAMError(err)
}
}
}
// Next should be userAccounts those are local users, OIDC and LDAP will not
// may not have any local users.
{
userAccounts := make(map[string]auth.Credentials)
globalIAMSys.store.rlock()
err := globalIAMSys.store.loadUsers(ctx, regUser, userAccounts)
globalIAMSys.store.runlock()
if err != nil {
return errSRBackendIssue(err)
}
for _, acc := range userAccounts {
if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemIAMUser,
IAMUser: &madmin.SRIAMUser{
AccessKey: acc.AccessKey,
IsDeleteReq: false,
UserReq: &madmin.AddOrUpdateUserReq{
SecretKey: acc.SecretKey,
Status: madmin.AccountStatus(acc.Status),
},
},
}); err != nil {
return errSRIAMError(err)
}
}
}
// Next should be Groups for some of these users, LDAP might have some Group
// DNs here
{
groups := make(map[string]GroupInfo)
globalIAMSys.store.rlock()
err := globalIAMSys.store.loadGroups(ctx, groups)
globalIAMSys.store.runlock()
if err != nil {
return errSRBackendIssue(err)
}
for gname, group := range groups {
if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemGroupInfo,
GroupInfo: &madmin.SRGroupInfo{
UpdateReq: madmin.GroupAddRemove{
Group: gname,
Members: group.Members,
Status: madmin.GroupStatus(group.Status),
IsRemove: false,
},
},
}); err != nil {
return errSRIAMError(err)
}
}
}
// Service accounts are the static accounts that should be synced with
// valid claims.
{
serviceAccounts := make(map[string]auth.Credentials)
globalIAMSys.store.rlock()
err := globalIAMSys.store.loadUsers(ctx, svcUser, serviceAccounts)
globalIAMSys.store.runlock()
if err != nil {
return errSRBackendIssue(err)
}
for user, acc := range serviceAccounts {
if user == siteReplicatorSvcAcc {
// skip the site replicate svc account as it is
// already replicated.
continue
}
claims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, acc.AccessKey)
if err != nil {
return errSRBackendIssue(err)
}
_, policy, err := globalIAMSys.GetServiceAccount(ctx, acc.AccessKey)
if err != nil {
return errSRBackendIssue(err)
}
var policyJSON []byte
if policy != nil {
policyJSON, err = json.Marshal(policy)
if err != nil {
return wrapSRErr(err)
}
}
err = c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemSvcAcc,
SvcAccChange: &madmin.SRSvcAccChange{
Create: &madmin.SRSvcAccCreate{
Parent: acc.ParentUser,
AccessKey: user,
SecretKey: acc.SecretKey,
Groups: acc.Groups,
Claims: claims,
SessionPolicy: json.RawMessage(policyJSON),
Status: acc.Status,
},
},
})
if err != nil {
return errSRIAMError(err)
}
}
}
// Followed by policy mapping for the userAccounts we previously synced.
{
// Replicate policy mappings on local to all peers.
userPolicyMap := make(map[string]MappedPolicy)
groupPolicyMap := make(map[string]MappedPolicy)
globalIAMSys.store.rlock()
errU := globalIAMSys.store.loadMappedPolicies(ctx, regUser, false, userPolicyMap)
errG := globalIAMSys.store.loadMappedPolicies(ctx, regUser, true, groupPolicyMap)
globalIAMSys.store.runlock()
if errU != nil {
return errSRBackendIssue(errU)
}
for user, mp := range userPolicyMap {
err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicyMapping,
PolicyMapping: &madmin.SRPolicyMapping{
UserOrGroup: user,
IsGroup: false,
Policy: mp.Policies,
},
})
if err != nil {
return errSRIAMError(err)
}
}
if errG != nil {
return errSRBackendIssue(errG)
}
for group, mp := range groupPolicyMap {
err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicyMapping,
PolicyMapping: &madmin.SRPolicyMapping{
UserOrGroup: group,
IsGroup: true,
Policy: mp.Policies,
},
})
if err != nil {
return errSRIAMError(err)
}
}
}
// and finally followed by policy mappings for for STS users.
{
// Replicate policy mappings on local to all peers.
userPolicyMap := make(map[string]MappedPolicy)
groupPolicyMap := make(map[string]MappedPolicy)
globalIAMSys.store.rlock()
errU := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, userPolicyMap)
errG := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, true, groupPolicyMap)
globalIAMSys.store.runlock()
if errU != nil {
return errSRBackendIssue(errU)
}
for user, mp := range userPolicyMap {
err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicyMapping,
PolicyMapping: &madmin.SRPolicyMapping{
UserOrGroup: user,
IsGroup: false,
Policy: mp.Policies,
},
})
if err != nil {
return errSRIAMError(err)
}
}
if errG != nil {
return errSRBackendIssue(errG)
}
for group, mp := range groupPolicyMap {
err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicyMapping,
PolicyMapping: &madmin.SRPolicyMapping{
UserOrGroup: group,
IsGroup: true,
Policy: mp.Policies,
},
})
if err != nil {
return errSRIAMError(err)
}
}
}
return nil
}
// Concurrency helpers
type concErr struct {
numActions int
errMap map[string]error
summaryErr error
}
func (c concErr) Error() string {
return c.summaryErr.Error()
}
func (c concErr) allFailed() bool {
return len(c.errMap) == c.numActions
}
func (c *SiteReplicationSys) toErrorFromErrMap(errMap map[string]error) error {
if len(errMap) == 0 {
return nil
}
msgs := []string{}
for d, err := range errMap {
name := c.state.Peers[d].Name
msgs = append(msgs, fmt.Sprintf("Site %s (%s): %v", name, d, err))
}
return fmt.Errorf("Site replication error(s): %s", strings.Join(msgs, "; "))
}
func (c *SiteReplicationSys) newConcErr(numActions int, errMap map[string]error) concErr {
return concErr{
numActions: numActions,
errMap: errMap,
summaryErr: c.toErrorFromErrMap(errMap),
}
}
// concDo calls actions concurrently. selfActionFn is run for the current
// cluster and peerActionFn is run for each peer replication cluster.
func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func(deploymentID string, p madmin.PeerInfo) error) concErr {
depIDs := make([]string, 0, len(c.state.Peers))
for d := range c.state.Peers {
depIDs = append(depIDs, d)
}
errs := make([]error, len(c.state.Peers))
var wg sync.WaitGroup
for i := range depIDs {
wg.Add(1)
go func(i int) {
if depIDs[i] == globalDeploymentID {
if selfActionFn != nil {
errs[i] = selfActionFn()
}
} else {
errs[i] = peerActionFn(depIDs[i], c.state.Peers[depIDs[i]])
}
wg.Done()
}(i)
}
wg.Wait()
errMap := make(map[string]error, len(c.state.Peers))
for i, depID := range depIDs {
if errs[i] != nil {
errMap[depID] = errs[i]
}
}
numActions := len(c.state.Peers) - 1
if selfActionFn != nil {
numActions++
}
return c.newConcErr(numActions, errMap)
}
func (c *SiteReplicationSys) annotateErr(annotation string, err error) error {
if err == nil {
return nil
}
return fmt.Errorf("%s: %s: %v", c.state.Name, annotation, err)
}
func (c *SiteReplicationSys) annotatePeerErr(dstPeer string, annotation string, err error) error {
if err == nil {
return nil
}
return fmt.Errorf("%s->%s: %s: %v", c.state.Name, dstPeer, annotation, err)
}
// isEnabled returns true if site replication is enabled
func (c *SiteReplicationSys) isEnabled() bool {
c.RLock()
defer c.RUnlock()
return c.enabled
}
var errMissingSRConfig = fmt.Errorf("Site not found in site replication configuration")
// RemovePeerCluster - removes one or more clusters from site replication configuration.
func (c *SiteReplicationSys) RemovePeerCluster(ctx context.Context, objectAPI ObjectLayer, rreq madmin.SRRemoveReq) (st madmin.ReplicateRemoveStatus, err error) {
if !c.isEnabled() {
return st, errSRNotEnabled
}
info, err := c.GetClusterInfo(ctx)
if err != nil {
return st, errSRBackendIssue(err)
}
peerMap := make(map[string]madmin.PeerInfo)
var rmvEndpoints []string
siteNames := rreq.SiteNames
updatedPeers := make(map[string]madmin.PeerInfo)
for _, pi := range info.Sites {
updatedPeers[pi.DeploymentID] = pi
peerMap[pi.Name] = pi
if rreq.RemoveAll {
siteNames = append(siteNames, pi.Name)
}
}
for _, s := range siteNames {
info, ok := peerMap[s]
if !ok {
return st, errSRConfigMissingError(errMissingSRConfig)
}
rmvEndpoints = append(rmvEndpoints, info.Endpoint)
delete(updatedPeers, info.DeploymentID)
}
var wg sync.WaitGroup
errs := make(map[string]error, len(c.state.Peers))
for _, v := range info.Sites {
wg.Add(1)
if v.DeploymentID == globalDeploymentID {
go func() {
defer wg.Done()
err := c.RemoveRemoteTargetsForEndpoint(ctx, objectAPI, rmvEndpoints, false)
errs[globalDeploymentID] = err
}()
continue
}
go func(pi madmin.PeerInfo) {
defer wg.Done()
admClient, err := c.getAdminClient(ctx, pi.DeploymentID)
if err != nil {
errs[pi.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", pi.Name, err))
return
}
if _, err = admClient.SRPeerRemove(ctx, rreq); err != nil {
if errors.As(err, &errMissingSRConfig) {
return
}
errs[pi.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", pi.Name, err))
return
}
}(v)
}
wg.Wait()
for dID, err := range errs {
if err != nil {
return madmin.ReplicateRemoveStatus{
ErrDetail: err.Error(),
Status: madmin.ReplicateRemoveStatusPartial,
}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err))
}
}
// Update cluster state
var state srState
if len(updatedPeers) > 1 {
state = srState{
Name: info.Name,
Peers: updatedPeers,
ServiceAccountAccessKey: info.ServiceAccountAccessKey,
}
}
if err = c.saveToDisk(ctx, state); err != nil {
return madmin.ReplicateRemoveStatus{
Status: madmin.ReplicateRemoveStatusPartial,
ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err),
}, nil
}
return madmin.ReplicateRemoveStatus{
Status: madmin.ReplicateRemoveStatusSuccess,
}, nil
}
// InternalRemoveReq - sends an unlink request to peer cluster to remove one or more sites
// from the site replication configuration.
func (c *SiteReplicationSys) InternalRemoveReq(ctx context.Context, objectAPI ObjectLayer, rreq madmin.SRRemoveReq) error {
if !c.isEnabled() {
return errSRNotEnabled
}
ourName := ""
peerMap := make(map[string]madmin.PeerInfo)
updatedPeers := make(map[string]madmin.PeerInfo)
siteNames := rreq.SiteNames
for _, p := range c.state.Peers {
peerMap[p.Name] = p
if p.DeploymentID == globalDeploymentID {
ourName = p.Name
}
updatedPeers[p.DeploymentID] = p
if rreq.RemoveAll {
siteNames = append(siteNames, p.Name)
}
}
var rmvEndpoints []string
var unlinkSelf bool
for _, s := range siteNames {
info, ok := peerMap[s]
if !ok {
return errMissingSRConfig
}
if info.DeploymentID == globalDeploymentID {
unlinkSelf = true
continue
}
delete(updatedPeers, info.DeploymentID)
rmvEndpoints = append(rmvEndpoints, info.Endpoint)
}
if err := c.RemoveRemoteTargetsForEndpoint(ctx, objectAPI, rmvEndpoints, unlinkSelf); err != nil {
return err
}
var state srState
if !unlinkSelf {
state = srState{
Name: c.state.Name,
Peers: updatedPeers,
ServiceAccountAccessKey: c.state.ServiceAccountAccessKey,
}
}
if err := c.saveToDisk(ctx, state); err != nil {
return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err))
}
return nil
}
// RemoveRemoteTargetsForEndpoint removes replication targets corresponding to endpoint
func (c *SiteReplicationSys) RemoveRemoteTargetsForEndpoint(ctx context.Context, objectAPI ObjectLayer, endpoints []string, unlinkSelf bool) (err error) {
targets := globalBucketTargetSys.ListTargets(ctx, "", string(madmin.ReplicationService))
m := make(map[string]madmin.BucketTarget)
for _, t := range targets {
for _, endpoint := range endpoints {
ep, _ := url.Parse(endpoint)
if t.Endpoint == ep.Host &&
t.Secure == (ep.Scheme == "https") &&
t.Type == madmin.ReplicationService {
m[t.Arn] = t
}
}
// all remote targets from self are to be delinked
if unlinkSelf {
m[t.Arn] = t
}
}
buckets, err := objectAPI.ListBuckets(ctx)
for _, b := range buckets {
config, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, b.Name)
if err != nil {
return err
}
var nRules []sreplication.Rule
for _, r := range config.Rules {
if _, ok := m[r.Destination.Bucket]; !ok {
nRules = append(nRules, r)
}
}
if len(nRules) > 0 {
config.Rules = nRules
configData, err := xml.Marshal(config)
if err != nil {
return err
}
if err = globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, configData); err != nil {
return err
}
} else {
if err := globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, nil); err != nil {
return err
}
}
}
for arn, t := range m {
if err := globalBucketTargetSys.RemoveTarget(ctx, t.SourceBucket, arn); err != nil {
return err
}
}
return
}
// Other helpers
func getAdminClient(endpoint, accessKey, secretKey string) (*madmin.AdminClient, error) {
epURL, _ := url.Parse(endpoint)
client, err := madmin.New(epURL.Host, accessKey, secretKey, epURL.Scheme == "https")
if err != nil {
return nil, err
}
client.SetCustomTransport(NewRemoteTargetHTTPTransport())
return client, nil
}
func getS3Client(pc madmin.PeerSite) (*minioClient.Client, error) {
ep, err := url.Parse(pc.Endpoint)
if err != nil {
return nil, err
}
return minioClient.New(ep.Host, &minioClient.Options{
Creds: credentials.NewStaticV4(pc.AccessKey, pc.SecretKey, ""),
Secure: ep.Scheme == "https",
Transport: NewRemoteTargetHTTPTransport(),
})
}
func getPriorityHelper(replicationConfig replication.Config) int {
maxPrio := 0
for _, rule := range replicationConfig.Rules {
if rule.Priority > maxPrio {
maxPrio = rule.Priority
}
}
// leave some gaps in priority numbers for flexibility
return maxPrio + 10
}
// returns a slice with site names participating in site replciation but unspecified while adding
// a new site.
func getMissingSiteNames(oldDeps, newDeps set.StringSet, currSites []madmin.PeerInfo) []string {
diff := oldDeps.Difference(newDeps)
var diffSlc []string
for _, v := range currSites {
if diff.Contains(v.DeploymentID) {
diffSlc = append(diffSlc, v.Name)
}
}
return diffSlc
}
type srBucketMetaInfo struct {
madmin.SRBucketInfo
DeploymentID string
}
type srPolicy struct {
madmin.SRIAMPolicy
DeploymentID string
}
type srPolicyMapping struct {
madmin.SRPolicyMapping
DeploymentID string
}
type srUserInfo struct {
madmin.UserInfo
DeploymentID string
}
type srGroupDesc struct {
madmin.GroupDesc
DeploymentID string
}
// SiteReplicationStatus returns the site replication status across clusters participating in site replication.
func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info madmin.SRStatusInfo, err error) {
sinfo, err := c.siteReplicationStatus(ctx, objAPI, opts)
if err != nil {
return info, err
}
info = madmin.SRStatusInfo{
Enabled: sinfo.Enabled,
MaxBuckets: sinfo.MaxBuckets,
MaxUsers: sinfo.MaxUsers,
MaxGroups: sinfo.MaxGroups,
MaxPolicies: sinfo.MaxPolicies,
Sites: sinfo.Sites,
StatsSummary: sinfo.StatsSummary,
}
info.BucketStats = make(map[string]map[string]madmin.SRBucketStatsSummary, len(sinfo.Sites))
info.PolicyStats = make(map[string]map[string]madmin.SRPolicyStatsSummary)
info.UserStats = make(map[string]map[string]madmin.SRUserStatsSummary)
info.GroupStats = make(map[string]map[string]madmin.SRGroupStatsSummary)
numSites := len(info.Sites)
for b, stat := range sinfo.BucketStats {
for dID, st := range stat {
if st.TagMismatch ||
st.VersioningConfigMismatch ||
st.OLockConfigMismatch ||
st.SSEConfigMismatch ||
st.PolicyMismatch ||
st.ReplicationCfgMismatch ||
st.QuotaCfgMismatch ||
opts.Entity == madmin.SRBucketEntity {
if _, ok := info.BucketStats[b]; !ok {
info.BucketStats[b] = make(map[string]madmin.SRBucketStatsSummary, numSites)
}
info.BucketStats[b][dID] = st.SRBucketStatsSummary
}
}
}
for u, stat := range sinfo.UserStats {
for dID, st := range stat {
if st.PolicyMismatch || st.UserInfoMismatch || opts.Entity == madmin.SRUserEntity {
if _, ok := info.UserStats[u]; !ok {
info.UserStats[u] = make(map[string]madmin.SRUserStatsSummary, numSites)
}
info.UserStats[u][dID] = st.SRUserStatsSummary
}
}
}
for g, stat := range sinfo.GroupStats {
for dID, st := range stat {
if st.PolicyMismatch || st.GroupDescMismatch || opts.Entity == madmin.SRGroupEntity {
if _, ok := info.GroupStats[g]; !ok {
info.GroupStats[g] = make(map[string]madmin.SRGroupStatsSummary, numSites)
}
info.GroupStats[g][dID] = st.SRGroupStatsSummary
}
}
}
for p, stat := range sinfo.PolicyStats {
for dID, st := range stat {
if st.PolicyMismatch || opts.Entity == madmin.SRPolicyEntity {
if _, ok := info.PolicyStats[p]; !ok {
info.PolicyStats[p] = make(map[string]madmin.SRPolicyStatsSummary, numSites)
}
info.PolicyStats[p][dID] = st.SRPolicyStatsSummary
}
}
}
return
}
// siteReplicationStatus returns the site replication status across clusters participating in site replication.
func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info srStatusInfo, err error) {
c.RLock()
defer c.RUnlock()
if !c.enabled {
return info, err
}
sris := make([]madmin.SRInfo, len(c.state.Peers))
depIdx := make(map[string]int, len(c.state.Peers))
depIDs := make([]string, 0, len(c.state.Peers))
i := 0
for d := range c.state.Peers {
depIDs = append(depIDs, d)
depIdx[d] = i
i++
}
metaInfoConcErr := c.concDo(
func() error {
srInfo, err := c.SiteReplicationMetaInfo(ctx, objAPI, opts)
if err != nil {
return err
}
sris[depIdx[globalDeploymentID]] = srInfo
return nil
},
func(deploymentID string, p madmin.PeerInfo) error {
admClient, err := c.getAdminClient(ctx, deploymentID)
if err != nil {
return err
}
srInfo, err := admClient.SRMetaInfo(ctx, opts)
if err != nil {
return err
}
sris[depIdx[deploymentID]] = srInfo
return nil
},
)
if metaInfoConcErr.summaryErr != nil {
return info, errSRBackendIssue(metaInfoConcErr.summaryErr)
}
info.Enabled = true
info.Sites = make(map[string]madmin.PeerInfo, len(c.state.Peers))
for d, peer := range c.state.Peers {
info.Sites[d] = peer
}
var maxBuckets int
for _, sri := range sris {
if len(sri.Buckets) > maxBuckets {
maxBuckets = len(sri.Buckets)
}
}
// mapping b/w entity and entity config across sites
bucketStats := make(map[string][]srBucketMetaInfo)
policyStats := make(map[string][]srPolicy)
userPolicyStats := make(map[string][]srPolicyMapping)
groupPolicyStats := make(map[string][]srPolicyMapping)
userInfoStats := make(map[string][]srUserInfo)
groupDescStats := make(map[string][]srGroupDesc)
numSites := len(sris)
allBuckets := set.NewStringSet() // across sites
allUsers := set.NewStringSet()
allUserWPolicies := set.NewStringSet()
allGroups := set.NewStringSet()
allGroupWPolicies := set.NewStringSet()
allPolicies := set.NewStringSet()
for _, sri := range sris {
for b := range sri.Buckets {
allBuckets.Add(b)
}
for u := range sri.UserInfoMap {
allUsers.Add(u)
}
for g := range sri.GroupDescMap {
allGroups.Add(g)
}
for p := range sri.Policies {
allPolicies.Add(p)
}
for u := range sri.UserPolicies {
allUserWPolicies.Add(u)
}
for g := range sri.GroupPolicies {
allGroupWPolicies.Add(g)
}
}
for i, sri := range sris {
for b := range allBuckets {
if _, ok := bucketStats[b]; !ok {
bucketStats[b] = make([]srBucketMetaInfo, numSites)
}
si, ok := sri.Buckets[b]
if !ok {
si = madmin.SRBucketInfo{Bucket: b}
}
bucketStats[b][i] = srBucketMetaInfo{SRBucketInfo: si, DeploymentID: sri.DeploymentID}
}
for pname := range allPolicies {
if _, ok := policyStats[pname]; !ok {
policyStats[pname] = make([]srPolicy, numSites)
}
// if pname is not present in the map, the zero value
// will be returned.
pi := sri.Policies[pname]
policyStats[pname][i] = srPolicy{SRIAMPolicy: pi, DeploymentID: sri.DeploymentID}
}
for user := range allUserWPolicies {
if _, ok := userPolicyStats[user]; !ok {
userPolicyStats[user] = make([]srPolicyMapping, numSites)
}
up := sri.UserPolicies[user]
userPolicyStats[user][i] = srPolicyMapping{SRPolicyMapping: up, DeploymentID: sri.DeploymentID}
}
for group := range allGroupWPolicies {
if _, ok := groupPolicyStats[group]; !ok {
groupPolicyStats[group] = make([]srPolicyMapping, numSites)
}
up := sri.GroupPolicies[group]
groupPolicyStats[group][i] = srPolicyMapping{SRPolicyMapping: up, DeploymentID: sri.DeploymentID}
}
for u := range allUsers {
if _, ok := userInfoStats[u]; !ok {
userInfoStats[u] = make([]srUserInfo, numSites)
}
ui := sri.UserInfoMap[u]
userInfoStats[u][i] = srUserInfo{UserInfo: ui, DeploymentID: sri.DeploymentID}
}
for g := range allGroups {
if _, ok := groupDescStats[g]; !ok {
groupDescStats[g] = make([]srGroupDesc, numSites)
}
gd := sri.GroupDescMap[g]
groupDescStats[g][i] = srGroupDesc{GroupDesc: gd, DeploymentID: sri.DeploymentID}
}
}
info.StatsSummary = make(map[string]madmin.SRSiteSummary, len(c.state.Peers))
info.BucketStats = make(map[string]map[string]srBucketStatsSummary)
info.PolicyStats = make(map[string]map[string]srPolicyStatsSummary)
info.UserStats = make(map[string]map[string]srUserStatsSummary)
info.GroupStats = make(map[string]map[string]srGroupStatsSummary)
// collect user policy mapping replication status across sites
if opts.Users || opts.Entity == madmin.SRUserEntity {
for u, pslc := range userPolicyStats {
if len(info.UserStats[u]) == 0 {
info.UserStats[u] = make(map[string]srUserStatsSummary)
}
var policyMappings []madmin.SRPolicyMapping
uPolicyCount := 0
for _, ps := range pslc {
policyMappings = append(policyMappings, ps.SRPolicyMapping)
uPolicyCount++
sum := info.StatsSummary[ps.DeploymentID]
sum.TotalUserPolicyMappingCount++
info.StatsSummary[ps.DeploymentID] = sum
}
userPolicyMismatch := !isPolicyMappingReplicated(uPolicyCount, numSites, policyMappings)
for _, ps := range pslc {
dID := depIdx[ps.DeploymentID]
_, hasUser := sris[dID].UserPolicies[u]
info.UserStats[u][ps.DeploymentID] = srUserStatsSummary{
SRUserStatsSummary: madmin.SRUserStatsSummary{
PolicyMismatch: userPolicyMismatch,
HasUser: hasUser,
HasPolicyMapping: ps.Policy != "",
},
userPolicy: ps,
}
if !userPolicyMismatch || opts.Entity != madmin.SRUserEntity {
sum := info.StatsSummary[ps.DeploymentID]
if !ps.IsGroup {
sum.ReplicatedUserPolicyMappings++
}
info.StatsSummary[ps.DeploymentID] = sum
}
}
}
// collect user info replication status across sites
for u, pslc := range userInfoStats {
var uiSlc []madmin.UserInfo
userCount := 0
for _, ps := range pslc {
uiSlc = append(uiSlc, ps.UserInfo)
userCount++
sum := info.StatsSummary[ps.DeploymentID]
sum.TotalUsersCount++
info.StatsSummary[ps.DeploymentID] = sum
}
userInfoMismatch := !isUserInfoReplicated(userCount, numSites, uiSlc)
for _, ps := range pslc {
dID := depIdx[ps.DeploymentID]
_, hasUser := sris[dID].UserInfoMap[u]
if len(info.UserStats[u]) == 0 {
info.UserStats[u] = make(map[string]srUserStatsSummary)
}
umis, ok := info.UserStats[u][ps.DeploymentID]
if !ok {
umis = srUserStatsSummary{
SRUserStatsSummary: madmin.SRUserStatsSummary{
HasUser: hasUser,
},
}
}
umis.UserInfoMismatch = userInfoMismatch
umis.userInfo = ps
info.UserStats[u][ps.DeploymentID] = umis
if !userInfoMismatch || opts.Entity != madmin.SRUserEntity {
sum := info.StatsSummary[ps.DeploymentID]
sum.ReplicatedUsers++
info.StatsSummary[ps.DeploymentID] = sum
}
}
}
}
if opts.Groups || opts.Entity == madmin.SRGroupEntity {
// collect group policy mapping replication status across sites
for g, pslc := range groupPolicyStats {
var policyMappings []madmin.SRPolicyMapping
gPolicyCount := 0
for _, ps := range pslc {
policyMappings = append(policyMappings, ps.SRPolicyMapping)
gPolicyCount++
sum := info.StatsSummary[ps.DeploymentID]
sum.TotalGroupPolicyMappingCount++
info.StatsSummary[ps.DeploymentID] = sum
}
groupPolicyMismatch := !isPolicyMappingReplicated(gPolicyCount, numSites, policyMappings)
if len(info.GroupStats[g]) == 0 {
info.GroupStats[g] = make(map[string]srGroupStatsSummary)
}
for _, ps := range pslc {
dID := depIdx[ps.DeploymentID]
_, hasGroup := sris[dID].GroupPolicies[g]
info.GroupStats[g][ps.DeploymentID] = srGroupStatsSummary{
SRGroupStatsSummary: madmin.SRGroupStatsSummary{
PolicyMismatch: groupPolicyMismatch,
HasGroup: hasGroup,
HasPolicyMapping: ps.Policy != "",
DeploymentID: ps.DeploymentID,
},
groupPolicy: ps,
}
if !groupPolicyMismatch && opts.Entity != madmin.SRGroupEntity {
sum := info.StatsSummary[ps.DeploymentID]
sum.ReplicatedGroupPolicyMappings++
info.StatsSummary[ps.DeploymentID] = sum
}
}
}
// collect group desc replication status across sites
for g, pslc := range groupDescStats {
var gds []madmin.GroupDesc
groupCount := 0
for _, ps := range pslc {
groupCount++
sum := info.StatsSummary[ps.DeploymentID]
sum.TotalGroupsCount++
info.StatsSummary[ps.DeploymentID] = sum
gds = append(gds, ps.GroupDesc)
}
gdMismatch := !isGroupDescReplicated(groupCount, numSites, gds)
for _, ps := range pslc {
dID := depIdx[ps.DeploymentID]
_, hasGroup := sris[dID].GroupDescMap[g]
if len(info.GroupStats[g]) == 0 {
info.GroupStats[g] = make(map[string]srGroupStatsSummary)
}
gmis, ok := info.GroupStats[g][ps.DeploymentID]
if !ok {
gmis = srGroupStatsSummary{
SRGroupStatsSummary: madmin.SRGroupStatsSummary{
HasGroup: hasGroup,
},
}
}
gmis.GroupDescMismatch = gdMismatch
gmis.groupDesc = ps
info.GroupStats[g][ps.DeploymentID] = gmis
if !gdMismatch && opts.Entity != madmin.SRGroupEntity {
sum := info.StatsSummary[ps.DeploymentID]
sum.ReplicatedGroups++
info.StatsSummary[ps.DeploymentID] = sum
}
}
}
}
if opts.Policies || opts.Entity == madmin.SRPolicyEntity {
// collect IAM policy replication status across sites
for p, pslc := range policyStats {
var policies []*iampolicy.Policy
uPolicyCount := 0
for _, ps := range pslc {
plcy, err := iampolicy.ParseConfig(bytes.NewReader([]byte(ps.SRIAMPolicy.Policy)))
if err != nil {
continue
}
policies = append(policies, plcy)
uPolicyCount++
sum := info.StatsSummary[ps.DeploymentID]
sum.TotalIAMPoliciesCount++
info.StatsSummary[ps.DeploymentID] = sum
}
if len(info.PolicyStats[p]) == 0 {
info.PolicyStats[p] = make(map[string]srPolicyStatsSummary)
}
policyMismatch := !isIAMPolicyReplicated(uPolicyCount, numSites, policies)
for _, ps := range pslc {
dID := depIdx[ps.DeploymentID]
_, hasPolicy := sris[dID].Policies[p]
info.PolicyStats[p][ps.DeploymentID] = srPolicyStatsSummary{
SRPolicyStatsSummary: madmin.SRPolicyStatsSummary{
PolicyMismatch: policyMismatch,
HasPolicy: hasPolicy,
},
policy: ps,
}
switch {
case policyMismatch, opts.Entity == madmin.SRPolicyEntity:
default:
sum := info.StatsSummary[ps.DeploymentID]
if !policyMismatch {
sum.ReplicatedIAMPolicies++
}
info.StatsSummary[ps.DeploymentID] = sum
}
}
}
}
if opts.Buckets || opts.Entity == madmin.SRBucketEntity {
// collect bucket metadata replication stats across sites
for b, slc := range bucketStats {
tagSet := set.NewStringSet()
olockConfigSet := set.NewStringSet()
policies := make([]*bktpolicy.Policy, numSites)
replCfgs := make([]*sreplication.Config, numSites)
quotaCfgs := make([]*madmin.BucketQuota, numSites)
sseCfgSet := set.NewStringSet()
versionCfgSet := set.NewStringSet()
var tagCount, olockCfgCount, sseCfgCount, versionCfgCount int
for i, s := range slc {
if s.ReplicationConfig != nil {
cfgBytes, err := base64.StdEncoding.DecodeString(*s.ReplicationConfig)
if err != nil {
continue
}
cfg, err := sreplication.ParseConfig(bytes.NewReader(cfgBytes))
if err != nil {
continue
}
replCfgs[i] = cfg
}
if s.Versioning != nil {
configData, err := base64.StdEncoding.DecodeString(*s.Versioning)
if err != nil {
continue
}
versionCfgCount++
if !versionCfgSet.Contains(string(configData)) {
versionCfgSet.Add(string(configData))
}
}
if s.QuotaConfig != nil {
cfgBytes, err := base64.StdEncoding.DecodeString(*s.QuotaConfig)
if err != nil {
continue
}
cfg, err := parseBucketQuota(b, cfgBytes)
if err != nil {
continue
}
quotaCfgs[i] = cfg
}
if s.Tags != nil {
tagBytes, err := base64.StdEncoding.DecodeString(*s.Tags)
if err != nil {
continue
}
tagCount++
if !tagSet.Contains(string(tagBytes)) {
tagSet.Add(string(tagBytes))
}
}
if len(s.Policy) > 0 {
plcy, err := bktpolicy.ParseConfig(bytes.NewReader(s.Policy), b)
if err != nil {
continue
}
policies[i] = plcy
}
if s.ObjectLockConfig != nil {
configData, err := base64.StdEncoding.DecodeString(*s.ObjectLockConfig)
if err != nil {
continue
}
olockCfgCount++
if !olockConfigSet.Contains(string(configData)) {
olockConfigSet.Add(string(configData))
}
}
if s.SSEConfig != nil {
configData, err := base64.StdEncoding.DecodeString(*s.SSEConfig)
if err != nil {
continue
}
sseCfgCount++
if !sseCfgSet.Contains(string(configData)) {
sseCfgSet.Add(string(configData))
}
}
ss, ok := info.StatsSummary[s.DeploymentID]
if !ok {
ss = madmin.SRSiteSummary{}
}
// increment total number of replicated buckets
if len(slc) == numSites {
ss.ReplicatedBuckets++
}
ss.TotalBucketsCount++
if tagCount > 0 {
ss.TotalTagsCount++
}
if olockCfgCount > 0 {
ss.TotalLockConfigCount++
}
if sseCfgCount > 0 {
ss.TotalSSEConfigCount++
}
if versionCfgCount > 0 {
ss.TotalVersioningConfigCount++
}
if len(policies) > 0 {
ss.TotalBucketPoliciesCount++
}
info.StatsSummary[s.DeploymentID] = ss
}
tagMismatch := !isReplicated(tagCount, numSites, tagSet)
olockCfgMismatch := !isReplicated(olockCfgCount, numSites, olockConfigSet)
sseCfgMismatch := !isReplicated(sseCfgCount, numSites, sseCfgSet)
versionCfgMismatch := !isReplicated(versionCfgCount, numSites, versionCfgSet)
policyMismatch := !isBktPolicyReplicated(numSites, policies)
replCfgMismatch := !isBktReplCfgReplicated(numSites, replCfgs)
quotaCfgMismatch := !isBktQuotaCfgReplicated(numSites, quotaCfgs)
info.BucketStats[b] = make(map[string]srBucketStatsSummary, numSites)
for i, s := range slc {
dIdx := depIdx[s.DeploymentID]
var hasBucket bool
if bi, ok := sris[dIdx].Buckets[s.Bucket]; ok {
hasBucket = !bi.CreatedAt.Equal(timeSentinel)
}
quotaCfgSet := hasBucket && *quotaCfgs[i] != madmin.BucketQuota{}
ss := madmin.SRBucketStatsSummary{
DeploymentID: s.DeploymentID,
HasBucket: hasBucket,
TagMismatch: tagMismatch,
OLockConfigMismatch: olockCfgMismatch,
SSEConfigMismatch: sseCfgMismatch,
VersioningConfigMismatch: versionCfgMismatch,
PolicyMismatch: policyMismatch,
ReplicationCfgMismatch: replCfgMismatch,
QuotaCfgMismatch: quotaCfgMismatch,
HasReplicationCfg: s.ReplicationConfig != nil,
HasTagsSet: s.Tags != nil,
HasOLockConfigSet: s.ObjectLockConfig != nil,
HasPolicySet: s.Policy != nil,
HasQuotaCfgSet: quotaCfgSet,
HasSSECfgSet: s.SSEConfig != nil,
}
var m srBucketMetaInfo
if len(bucketStats[s.Bucket]) > dIdx {
m = bucketStats[s.Bucket][dIdx]
}
info.BucketStats[b][s.DeploymentID] = srBucketStatsSummary{
SRBucketStatsSummary: ss,
meta: m,
}
}
// no mismatch
for _, s := range slc {
sum := info.StatsSummary[s.DeploymentID]
if !olockCfgMismatch && olockCfgCount == numSites {
sum.ReplicatedLockConfig++
}
if !versionCfgMismatch && versionCfgCount == numSites {
sum.ReplicatedVersioningConfig++
}
if !sseCfgMismatch && sseCfgCount == numSites {
sum.ReplicatedSSEConfig++
}
if !policyMismatch && len(policies) == numSites {
sum.ReplicatedBucketPolicies++
}
if !tagMismatch && tagCount == numSites {
sum.ReplicatedTags++
}
info.StatsSummary[s.DeploymentID] = sum
}
}
}
// maximum buckets users etc seen across sites
info.MaxBuckets = len(bucketStats)
info.MaxUsers = len(userInfoStats)
info.MaxGroups = len(groupDescStats)
info.MaxPolicies = len(policyStats)
return
}
// isReplicated returns true if count of replicated matches the number of
// sites and there is atmost one unique entry in the set.
func isReplicated(cntReplicated, total int, valSet set.StringSet) bool {
if cntReplicated > 0 && cntReplicated < total {
return false
}
if len(valSet) > 1 {
// mismatch - one or more sites has differing tags/policy
return false
}
return true
}
// isIAMPolicyReplicated returns true if count of replicated IAM policies matches total
// number of sites and IAM policies are identical.
func isIAMPolicyReplicated(cntReplicated, total int, policies []*iampolicy.Policy) bool {
if cntReplicated > 0 && cntReplicated != total {
return false
}
// check if policies match between sites
var prev *iampolicy.Policy
for i, p := range policies {
if i == 0 {
prev = p
continue
}
if !prev.Equals(*p) {
return false
}
}
return true
}
// isPolicyMappingReplicated returns true if count of replicated IAM policy mappings matches total
// number of sites and IAM policy mappings are identical.
func isPolicyMappingReplicated(cntReplicated, total int, policies []madmin.SRPolicyMapping) bool {
if cntReplicated > 0 && cntReplicated != total {
return false
}
// check if policies match between sites
var prev madmin.SRPolicyMapping
for i, p := range policies {
if i == 0 {
prev = p
continue
}
if prev.IsGroup != p.IsGroup ||
prev.Policy != p.Policy ||
prev.UserOrGroup != p.UserOrGroup {
return false
}
}
return true
}
func isUserInfoReplicated(cntReplicated, total int, uis []madmin.UserInfo) bool {
if cntReplicated > 0 && cntReplicated != total {
return false
}
// check if policies match between sites
var prev madmin.UserInfo
for i, ui := range uis {
if i == 0 {
prev = ui
continue
}
if !isUserInfoEqual(prev, ui) {
return false
}
}
return true
}
func isGroupDescReplicated(cntReplicated, total int, gds []madmin.GroupDesc) bool {
if cntReplicated > 0 && cntReplicated != total {
return false
}
// check if policies match between sites
var prev madmin.GroupDesc
for i, gd := range gds {
if i == 0 {
prev = gd
continue
}
if !isGroupDescEqual(prev, gd) {
return false
}
}
return true
}
func isBktQuotaCfgReplicated(total int, quotaCfgs []*madmin.BucketQuota) bool {
numquotaCfgs := 0
for _, q := range quotaCfgs {
if q == nil {
continue
}
numquotaCfgs++
}
if numquotaCfgs > 0 && numquotaCfgs != total {
return false
}
var prev *madmin.BucketQuota
for i, q := range quotaCfgs {
if q == nil {
return false
}
if i == 0 {
prev = q
continue
}
if prev.Quota != q.Quota || prev.Type != q.Type {
return false
}
}
return true
}
// isBktPolicyReplicated returns true if count of replicated bucket policies matches total
// number of sites and bucket policies are identical.
func isBktPolicyReplicated(total int, policies []*bktpolicy.Policy) bool {
numPolicies := 0
for _, p := range policies {
if p == nil {
continue
}
numPolicies++
}
if numPolicies > 0 && numPolicies != total {
return false
}
// check if policies match between sites
var prev *bktpolicy.Policy
for i, p := range policies {
if p == nil {
continue
}
if i == 0 {
prev = p
continue
}
if !prev.Equals(*p) {
return false
}
}
return true
}
// isBktReplCfgReplicated returns true if all the sites have same number
// of replication rules with all replication features enabled.
func isBktReplCfgReplicated(total int, cfgs []*sreplication.Config) bool {
cntReplicated := 0
for _, c := range cfgs {
if c == nil {
continue
}
cntReplicated++
}
if cntReplicated > 0 && cntReplicated != total {
return false
}
// check if policies match between sites
var prev *sreplication.Config
for i, c := range cfgs {
if c == nil {
continue
}
if i == 0 {
prev = c
continue
}
if len(prev.Rules) != len(c.Rules) {
return false
}
if len(c.Rules) != total-1 {
return false
}
for _, r := range c.Rules {
if !strings.HasPrefix(r.ID, "site-repl-") {
return false
}
if r.DeleteMarkerReplication.Status == sreplication.Disabled ||
r.DeleteReplication.Status == sreplication.Disabled ||
r.ExistingObjectReplication.Status == sreplication.Disabled ||
r.SourceSelectionCriteria.ReplicaModifications.Status == sreplication.Disabled {
return false
}
}
}
return true
}
// SiteReplicationMetaInfo returns the metadata info on buckets, policies etc for the replicated site
func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info madmin.SRInfo, err error) {
if objAPI == nil {
return info, errSRObjectLayerNotReady
}
c.RLock()
defer c.RUnlock()
if !c.enabled {
return info, nil
}
info.DeploymentID = globalDeploymentID
if opts.Buckets || opts.Entity == madmin.SRBucketEntity {
var (
buckets []BucketInfo
err error
)
if opts.Entity == madmin.SRBucketEntity {
bi, err := objAPI.GetBucketInfo(ctx, opts.EntityValue)
if err != nil {
return info, errSRBackendIssue(err)
}
buckets = append(buckets, bi)
} else {
buckets, err = objAPI.ListBuckets(ctx)
if err != nil {
return info, errSRBackendIssue(err)
}
}
info.Buckets = make(map[string]madmin.SRBucketInfo, len(buckets))
for _, bucketInfo := range buckets {
bucket := bucketInfo.Name
createdAt, err := globalBucketMetadataSys.CreatedAt(bucket)
if err != nil {
return info, errSRBackendIssue(err)
}
bms := madmin.SRBucketInfo{
Bucket: bucket,
CreatedAt: createdAt,
Location: globalSite.Region,
}
// Get bucket policy if present.
policy, err := globalPolicySys.Get(bucket)
found := true
if _, ok := err.(BucketPolicyNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
policyJSON, err := json.Marshal(policy)
if err != nil {
return info, wrapSRErr(err)
}
bms.Policy = policyJSON
}
// Get bucket tags if present.
tags, updatedAt, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
found = true
if _, ok := err.(BucketTaggingNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
tagBytes, err := xml.Marshal(tags)
if err != nil {
return info, wrapSRErr(err)
}
tagCfgStr := base64.StdEncoding.EncodeToString(tagBytes)
bms.Tags = &tagCfgStr
bms.TagConfigUpdatedAt = updatedAt
}
// Get object-lock config if present.
objLockCfg, updatedAt, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
found = true
if _, ok := err.(BucketObjectLockConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
objLockCfgData, err := xml.Marshal(objLockCfg)
if err != nil {
return info, wrapSRErr(err)
}
objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData)
bms.ObjectLockConfig = &objLockStr
bms.ObjectLockConfigUpdatedAt = updatedAt
}
// Get quota config if present
quotaConfig, updatedAt, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket)
found = true
if _, ok := err.(BucketQuotaConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
quotaConfigJSON, err := json.Marshal(quotaConfig)
if err != nil {
return info, wrapSRErr(err)
}
quotaConfigStr := base64.StdEncoding.EncodeToString(quotaConfigJSON)
bms.QuotaConfig = &quotaConfigStr
bms.QuotaConfigUpdatedAt = updatedAt
}
// Get existing bucket bucket encryption settings
sseConfig, updatedAt, err := globalBucketMetadataSys.GetSSEConfig(bucket)
found = true
if _, ok := err.(BucketSSEConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
sseConfigData, err := xml.Marshal(sseConfig)
if err != nil {
return info, wrapSRErr(err)
}
sseConfigStr := base64.StdEncoding.EncodeToString(sseConfigData)
bms.SSEConfig = &sseConfigStr
bms.SSEConfigUpdatedAt = updatedAt
}
// Get replication config if present
rcfg, updatedAt, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
found = true
if _, ok := err.(BucketReplicationConfigNotFound); ok {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
rcfgXML, err := xml.Marshal(rcfg)
if err != nil {
return info, wrapSRErr(err)
}
rcfgXMLStr := base64.StdEncoding.EncodeToString(rcfgXML)
bms.ReplicationConfig = &rcfgXMLStr
bms.ReplicationConfigUpdatedAt = updatedAt
}
info.Buckets[bucket] = bms
}
}
if opts.Policies || opts.Entity == madmin.SRPolicyEntity {
var allPolicies map[string]PolicyDoc
if opts.Entity == madmin.SRPolicyEntity {
if p, err := globalIAMSys.store.GetPolicyDoc(opts.EntityValue); err == nil {
allPolicies = map[string]PolicyDoc{opts.EntityValue: p}
}
} else {
// Replicate IAM policies on local to all peers.
allPolicies, err = globalIAMSys.ListPolicyDocs(ctx, "")
if err != nil {
return info, errSRBackendIssue(err)
}
}
info.Policies = make(map[string]madmin.SRIAMPolicy, len(allPolicies))
for pname, policyDoc := range allPolicies {
policyJSON, err := json.Marshal(policyDoc.Policy)
if err != nil {
return info, wrapSRErr(err)
}
info.Policies[pname] = madmin.SRIAMPolicy{Policy: json.RawMessage(policyJSON), UpdatedAt: policyDoc.UpdateDate}
}
}
if opts.Users || opts.Entity == madmin.SRUserEntity {
// Replicate policy mappings on local to all peers.
userPolicyMap := make(map[string]MappedPolicy)
if opts.Entity == madmin.SRUserEntity {
if mp, ok := globalIAMSys.store.GetMappedPolicy(opts.EntityValue, false); ok {
userPolicyMap[opts.EntityValue] = mp
}
} else {
globalIAMSys.store.rlock()
if err := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, userPolicyMap); err != nil {
return info, errSRBackendIssue(err)
}
if err = globalIAMSys.store.loadMappedPolicies(ctx, regUser, false, userPolicyMap); err != nil {
return info, errSRBackendIssue(err)
}
globalIAMSys.store.runlock()
}
info.UserPolicies = make(map[string]madmin.SRPolicyMapping, len(userPolicyMap))
for user, mp := range userPolicyMap {
info.UserPolicies[user] = madmin.SRPolicyMapping{
IsGroup: false,
UserOrGroup: user,
Policy: mp.Policies,
UpdatedAt: mp.UpdatedAt,
}
}
info.UserInfoMap = make(map[string]madmin.UserInfo)
if opts.Entity == madmin.SRUserEntity {
if ui, err := globalIAMSys.GetUserInfo(ctx, opts.EntityValue); err == nil {
info.UserInfoMap[opts.EntityValue] = ui
}
} else {
// get users/group info on local.
userInfoMap, err := globalIAMSys.ListUsers(ctx)
if err != nil {
return info, errSRBackendIssue(err)
}
for user, ui := range userInfoMap {
info.UserInfoMap[user] = ui
svcAccts, err := globalIAMSys.ListServiceAccounts(ctx, user)
if err != nil {
return info, errSRBackendIssue(err)
}
for _, svcAcct := range svcAccts {
info.UserInfoMap[svcAcct.AccessKey] = madmin.UserInfo{
Status: madmin.AccountStatus(svcAcct.Status),
}
}
tempAccts, err := globalIAMSys.ListTempAccounts(ctx, user)
if err != nil {
return info, errSRBackendIssue(err)
}
for _, tempAcct := range tempAccts {
info.UserInfoMap[tempAcct.AccessKey] = madmin.UserInfo{
Status: madmin.AccountStatus(tempAcct.Status),
}
}
}
}
}
if opts.Groups || opts.Entity == madmin.SRGroupEntity {
// Replicate policy mappings on local to all peers.
groupPolicyMap := make(map[string]MappedPolicy)
if opts.Entity == madmin.SRUserEntity {
if mp, ok := globalIAMSys.store.GetMappedPolicy(opts.EntityValue, true); ok {
groupPolicyMap[opts.EntityValue] = mp
}
} else {
globalIAMSys.store.rlock()
if err := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, true, groupPolicyMap); err != nil {
return info, errSRBackendIssue(err)
}
if err := globalIAMSys.store.loadMappedPolicies(ctx, regUser, true, groupPolicyMap); err != nil {
return info, errSRBackendIssue(err)
}
globalIAMSys.store.runlock()
}
info.GroupPolicies = make(map[string]madmin.SRPolicyMapping, len(c.state.Peers))
for group, mp := range groupPolicyMap {
info.GroupPolicies[group] = madmin.SRPolicyMapping{
IsGroup: true,
UserOrGroup: group,
Policy: mp.Policies,
UpdatedAt: mp.UpdatedAt,
}
}
info.GroupDescMap = make(map[string]madmin.GroupDesc)
if opts.Entity == madmin.SRGroupEntity {
if gd, err := globalIAMSys.GetGroupDescription(opts.EntityValue); err == nil {
info.GroupDescMap[opts.EntityValue] = gd
}
} else {
// get users/group info on local.
groups, errG := globalIAMSys.ListGroups(ctx)
if errG != nil {
return info, errSRBackendIssue(errG)
}
groupDescMap := make(map[string]madmin.GroupDesc, len(groups))
for _, g := range groups {
groupDescMap[g], errG = globalIAMSys.GetGroupDescription(g)
if errG != nil {
return info, errSRBackendIssue(errG)
}
}
for group, d := range groupDescMap {
info.GroupDescMap[group] = d
}
}
}
return info, nil
}
// EditPeerCluster - edits replication configuration and updates peer endpoint.
func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.PeerInfo) (madmin.ReplicateEditStatus, error) {
sites, err := c.GetClusterInfo(ctx)
if err != nil {
return madmin.ReplicateEditStatus{}, errSRBackendIssue(err)
}
if !sites.Enabled {
return madmin.ReplicateEditStatus{}, errSRNotEnabled
}
var (
found bool
admClient *madmin.AdminClient
)
for _, v := range sites.Sites {
if peer.DeploymentID == v.DeploymentID {
found = true
if peer.Endpoint == v.Endpoint {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s entered for deployment id %s already configured in site replication", v.Endpoint, v.DeploymentID))
}
admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint)
if err != nil {
return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
}
// check if endpoint is reachable
if _, err = admClient.ServerInfo(ctx); err != nil {
return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("Endpoint %s not reachable: %w", peer.Endpoint, err))
}
}
}
if !found {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("%s not found in existing replicated sites", peer.DeploymentID))
}
errs := make(map[string]error, len(c.state.Peers))
var wg sync.WaitGroup
pi := c.state.Peers[peer.DeploymentID]
pi.Endpoint = peer.Endpoint
for i, v := range sites.Sites {
if v.DeploymentID == globalDeploymentID {
c.state.Peers[peer.DeploymentID] = pi
continue
}
wg.Add(1)
go func(pi madmin.PeerInfo, i int) {
defer wg.Done()
v := sites.Sites[i]
admClient, err := c.getAdminClient(ctx, v.DeploymentID)
if v.DeploymentID == peer.DeploymentID {
admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint)
}
if err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
return
}
if err = admClient.SRPeerEdit(ctx, pi); err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", v.Name, err))
return
}
}(pi, i)
}
wg.Wait()
for dID, err := range errs {
if err != nil {
return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err))
}
}
// we can now save the cluster replication configuration state.
if err = c.saveToDisk(ctx, c.state); err != nil {
return madmin.ReplicateEditStatus{
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err),
}, nil
}
result := madmin.ReplicateEditStatus{
Success: true,
Status: fmt.Sprintf("Cluster replication configuration updated with endpoint %s for peer %s successfully", peer.Endpoint, peer.Name),
}
return result, nil
}
// PeerEditReq - internal API handler to respond to a peer cluster's request
// to edit endpoint.
func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInfo) error {
ourName := ""
for i := range c.state.Peers {
p := c.state.Peers[i]
if p.DeploymentID == arg.DeploymentID {
p.Endpoint = arg.Endpoint
c.state.Peers[arg.DeploymentID] = p
}
if p.DeploymentID == globalDeploymentID {
ourName = p.Name
}
}
if err := c.saveToDisk(ctx, c.state); err != nil {
return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err))
}
return nil
}
const siteHealTimeInterval = 10 * time.Second
func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) {
healTimer := time.NewTimer(siteHealTimeInterval)
defer healTimer.Stop()
for {
select {
case <-healTimer.C:
c.RLock()
enabled := c.enabled
c.RUnlock()
if enabled {
c.healIAMSystem(ctx, objAPI) // heal IAM system first
c.healBuckets(ctx, objAPI) // heal buckets subsequently
}
healTimer.Reset(siteHealTimeInterval)
case <-ctx.Done():
return
}
}
}
type srBucketStatsSummary struct {
madmin.SRBucketStatsSummary
meta srBucketMetaInfo
}
type srPolicyStatsSummary struct {
madmin.SRPolicyStatsSummary
policy srPolicy
}
type srUserStatsSummary struct {
madmin.SRUserStatsSummary
userInfo srUserInfo
userPolicy srPolicyMapping
}
type srGroupStatsSummary struct {
madmin.SRGroupStatsSummary
groupDesc srGroupDesc
groupPolicy srPolicyMapping
}
type srStatusInfo struct {
// SRStatusInfo returns detailed status on site replication status
Enabled bool
MaxBuckets int // maximum buckets seen across sites
MaxUsers int // maximum users seen across sites
MaxGroups int // maximum groups seen across sites
MaxPolicies int // maximum policies across sites
Sites map[string]madmin.PeerInfo // deployment->sitename
StatsSummary map[string]madmin.SRSiteSummary // map of deployment id -> site stat
// BucketStats map of bucket to slice of deployment IDs with stats. This is populated only if there are
// mismatches or if a specific bucket's stats are requested
BucketStats map[string]map[string]srBucketStatsSummary
// PolicyStats map of policy to slice of deployment IDs with stats. This is populated only if there are
// mismatches or if a specific bucket's stats are requested
PolicyStats map[string]map[string]srPolicyStatsSummary
// UserStats map of user to slice of deployment IDs with stats. This is populated only if there are
// mismatches or if a specific bucket's stats are requested
UserStats map[string]map[string]srUserStatsSummary
// GroupStats map of group to slice of deployment IDs with stats. This is populated only if there are
// mismatches or if a specific bucket's stats are requested
GroupStats map[string]map[string]srGroupStatsSummary
}
func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer) error {
info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{
Buckets: true,
})
if err != nil {
return err
}
for bucket := range info.BucketStats {
c.healCreateMissingBucket(ctx, objAPI, bucket, info)
c.healVersioningMetadata(ctx, objAPI, bucket, info)
c.healOLockConfigMetadata(ctx, objAPI, bucket, info)
c.healSSEMetadata(ctx, objAPI, bucket, info)
c.healBucketReplicationConfig(ctx, objAPI, bucket, info)
c.healBucketPolicies(ctx, objAPI, bucket, info)
c.healTagMetadata(ctx, objAPI, bucket, info)
c.healBucketQuotaConfig(ctx, objAPI, bucket, info)
// Notification and ILM are site specific settings.
}
return nil
}
func (c *SiteReplicationSys) healTagMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
bs := info.BucketStats[bucket]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestTaggingConfig *string
)
for dID, ss := range bs {
if lastUpdate.IsZero() {
lastUpdate = ss.meta.TagConfigUpdatedAt
latestID = dID
latestTaggingConfig = ss.meta.Tags
}
// avoid considering just created buckets as latest. Perhaps this site
// just joined cluster replication and yet to be sync'd
if ss.meta.CreatedAt.Equal(ss.meta.TagConfigUpdatedAt) {
continue
}
if ss.meta.TagConfigUpdatedAt.After(lastUpdate) {
lastUpdate = ss.meta.TagConfigUpdatedAt
latestID = dID
latestTaggingConfig = ss.meta.Tags
}
}
latestPeerName = info.Sites[latestID].Name
var latestTaggingConfigBytes []byte
var err error
if latestTaggingConfig != nil {
latestTaggingConfigBytes, err = base64.StdEncoding.DecodeString(*latestTaggingConfig)
if err != nil {
return err
}
}
for dID, bStatus := range bs {
if !bStatus.TagMismatch {
continue
}
if isBucketMetadataEqual(latestTaggingConfig, bStatus.meta.Tags) {
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, latestTaggingConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing tagging metadata from peer site %s : %w", latestPeerName, err))
}
continue
}
admClient, err := c.getAdminClient(ctx, dID)
if err != nil {
return wrapSRErr(err)
}
peerName := info.Sites[dID].Name
err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeTags,
Bucket: bucket,
Tags: latestTaggingConfig,
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing tagging metadata for peer %s from peer %s : %w", peerName, latestPeerName, err)))
}
}
return nil
}
func (c *SiteReplicationSys) healBucketPolicies(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
bs := info.BucketStats[bucket]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestIAMPolicy json.RawMessage
)
for dID, ss := range bs {
if lastUpdate.IsZero() {
lastUpdate = ss.meta.PolicyUpdatedAt
latestID = dID
latestIAMPolicy = ss.meta.Policy
}
// avoid considering just created buckets as latest. Perhaps this site
// just joined cluster replication and yet to be sync'd
if ss.meta.CreatedAt.Equal(ss.meta.PolicyUpdatedAt) {
continue
}
if ss.meta.PolicyUpdatedAt.After(lastUpdate) {
lastUpdate = ss.meta.PolicyUpdatedAt
latestID = dID
latestIAMPolicy = ss.meta.Policy
}
}
latestPeerName = info.Sites[latestID].Name
for dID, bStatus := range bs {
if !bStatus.PolicyMismatch {
continue
}
if strings.EqualFold(string(latestIAMPolicy), string(bStatus.meta.Policy)) {
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, latestIAMPolicy); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing bucket policy metadata from peer site %s : %w", latestPeerName, err))
}
continue
}
admClient, err := c.getAdminClient(ctx, dID)
if err != nil {
return wrapSRErr(err)
}
peerName := info.Sites[dID].Name
if err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypePolicy,
Bucket: bucket,
Policy: latestIAMPolicy,
}); err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing bucket policy metadata for peer %s from peer %s : %w",
peerName, latestPeerName, err)))
}
}
return nil
}
func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
bs := info.BucketStats[bucket]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestQuotaConfig *string
latestCfgJSON []byte
latestQuotaConfigBytes []byte
)
for dID, ss := range bs {
if lastUpdate.IsZero() {
lastUpdate = ss.meta.QuotaConfigUpdatedAt
latestID = dID
latestQuotaConfig = ss.meta.QuotaConfig
}
// avoid considering just created buckets as latest. Perhaps this site
// just joined cluster replication and yet to be sync'd
if ss.meta.CreatedAt.Equal(ss.meta.QuotaConfigUpdatedAt) {
continue
}
if ss.meta.QuotaConfigUpdatedAt.After(lastUpdate) {
lastUpdate = ss.meta.QuotaConfigUpdatedAt
latestID = dID
latestQuotaConfig = ss.meta.QuotaConfig
}
}
var err error
if latestQuotaConfig != nil {
latestQuotaConfigBytes, err = base64.StdEncoding.DecodeString(*latestQuotaConfig)
if err != nil {
return err
}
}
latestPeerName = info.Sites[latestID].Name
for dID, bStatus := range bs {
if !bStatus.QuotaCfgMismatch {
continue
}
if isBucketMetadataEqual(latestQuotaConfig, bStatus.meta.QuotaConfig) {
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing quota metadata from peer site %s : %s", latestPeerName, err.Error()))
}
continue
}
admClient, err := c.getAdminClient(ctx, dID)
if err != nil {
return wrapSRErr(err)
}
peerName := info.Sites[dID].Name
if err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeQuotaConfig,
Bucket: bucket,
Quota: latestCfgJSON,
}); err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing quota config metadata for peer %s from peer %s : %s",
peerName, latestPeerName, err.Error())))
}
}
return nil
}
func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestVersioningConfig *string
)
bs := info.BucketStats[bucket]
for dID, ss := range bs {
if lastUpdate.IsZero() {
lastUpdate = ss.meta.VersioningConfigUpdatedAt
latestID = dID
latestVersioningConfig = ss.meta.Versioning
}
// avoid considering just created buckets as latest. Perhaps this site
// just joined cluster replication and yet to be sync'd
if ss.meta.CreatedAt.Equal(ss.meta.VersioningConfigUpdatedAt) {
continue
}
if ss.meta.VersioningConfigUpdatedAt.After(lastUpdate) {
lastUpdate = ss.meta.VersioningConfigUpdatedAt
latestID = dID
latestVersioningConfig = ss.meta.Versioning
}
}
latestPeerName = info.Sites[latestID].Name
var latestVersioningConfigBytes []byte
var err error
if latestVersioningConfig != nil {
latestVersioningConfigBytes, err = base64.StdEncoding.DecodeString(*latestVersioningConfig)
if err != nil {
return err
}
}
for dID, bStatus := range bs {
if !bStatus.VersioningConfigMismatch {
continue
}
if isBucketMetadataEqual(latestVersioningConfig, bStatus.meta.Versioning) {
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestVersioningConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing sse metadata from peer site %s : %w", latestPeerName, err))
}
continue
}
admClient, err := c.getAdminClient(ctx, dID)
if err != nil {
return wrapSRErr(err)
}
peerName := info.Sites[dID].Name
err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeVersionConfig,
Bucket: bucket,
Versioning: latestVersioningConfig,
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %s",
peerName, latestPeerName, err.Error())))
}
}
return nil
}
func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestSSEConfig *string
)
bs := info.BucketStats[bucket]
for dID, ss := range bs {
if lastUpdate.IsZero() {
lastUpdate = ss.meta.SSEConfigUpdatedAt
latestID = dID
latestSSEConfig = ss.meta.SSEConfig
}
// avoid considering just created buckets as latest. Perhaps this site
// just joined cluster replication and yet to be sync'd
if ss.meta.CreatedAt.Equal(ss.meta.SSEConfigUpdatedAt) {
continue
}
if ss.meta.SSEConfigUpdatedAt.After(lastUpdate) {
lastUpdate = ss.meta.SSEConfigUpdatedAt
latestID = dID
latestSSEConfig = ss.meta.SSEConfig
}
}
latestPeerName = info.Sites[latestID].Name
var latestSSEConfigBytes []byte
var err error
if latestSSEConfig != nil {
latestSSEConfigBytes, err = base64.StdEncoding.DecodeString(*latestSSEConfig)
if err != nil {
return err
}
}
for dID, bStatus := range bs {
if !bStatus.SSEConfigMismatch {
continue
}
if isBucketMetadataEqual(latestSSEConfig, bStatus.meta.SSEConfig) {
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestSSEConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing sse metadata from peer site %s : %w", latestPeerName, err))
}
continue
}
admClient, err := c.getAdminClient(ctx, dID)
if err != nil {
return wrapSRErr(err)
}
peerName := info.Sites[dID].Name
err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeSSEConfig,
Bucket: bucket,
SSEConfig: latestSSEConfig,
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing SSE config metadata for peer %s from peer %s : %s",
peerName, latestPeerName, err.Error())))
}
}
return nil
}
func (c *SiteReplicationSys) healOLockConfigMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
bs := info.BucketStats[bucket]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestObjLockConfig *string
)
for dID, ss := range bs {
if lastUpdate.IsZero() {
lastUpdate = ss.meta.ObjectLockConfigUpdatedAt
latestID = dID
latestObjLockConfig = ss.meta.ObjectLockConfig
}
// avoid considering just created buckets as latest. Perhaps this site
// just joined cluster replication and yet to be sync'd
if ss.meta.CreatedAt.Equal(ss.meta.ObjectLockConfigUpdatedAt) {
continue
}
if ss.meta.ObjectLockConfig != nil && ss.meta.ObjectLockConfigUpdatedAt.After(lastUpdate) {
lastUpdate = ss.meta.ObjectLockConfigUpdatedAt
latestID = dID
latestObjLockConfig = ss.meta.ObjectLockConfig
}
}
latestPeerName = info.Sites[latestID].Name
var latestObjLockConfigBytes []byte
var err error
if latestObjLockConfig != nil {
latestObjLockConfigBytes, err = base64.StdEncoding.DecodeString(*latestObjLockConfig)
if err != nil {
return err
}
}
for dID, bStatus := range bs {
if !bStatus.OLockConfigMismatch {
continue
}
if isBucketMetadataEqual(latestObjLockConfig, bStatus.meta.ObjectLockConfig) {
continue
}
if dID == globalDeploymentID {
if err := globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, latestObjLockConfigBytes); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing objectlock config metadata from peer site %s : %w", latestPeerName, err))
}
continue
}
admClient, err := c.getAdminClient(ctx, dID)
if err != nil {
return wrapSRErr(err)
}
peerName := info.Sites[dID].Name
err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{
Type: madmin.SRBucketMetaTypeObjectLockConfig,
Bucket: bucket,
Tags: latestObjLockConfig,
})
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing object lock config metadata for peer %s from peer %s : %w",
peerName, latestPeerName, err)))
}
}
return nil
}
func (c *SiteReplicationSys) healCreateMissingBucket(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
bs := info.BucketStats[bucket]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
bucketCnt := 0
var (
latestID string
lastUpdate time.Time
)
var dIDs []string
for dID, ss := range bs {
if ss.HasBucket {
bucketCnt++
} else {
dIDs = append(dIDs, dID)
}
if lastUpdate.IsZero() {
lastUpdate = ss.meta.CreatedAt
latestID = dID
}
if ss.meta.CreatedAt.After(lastUpdate) {
lastUpdate = ss.meta.CreatedAt
latestID = dID
}
}
latestPeerName := info.Sites[latestID].Name
bStatus := info.BucketStats[bucket][latestID].meta
var opts BucketOptions
optsMap := make(map[string]string)
if bStatus.Location != "" {
optsMap["location"] = bStatus.Location
opts.Location = bStatus.Location
}
optsMap["versioningEnabled"] = "true"
opts.VersioningEnabled = true
if bStatus.ObjectLockConfig != nil {
config, err := base64.StdEncoding.DecodeString(*bStatus.ObjectLockConfig)
if err != nil {
return err
}
if bytes.Equal([]byte(string(config)), enabledBucketObjectLockConfig) {
optsMap["lockEnabled"] = "true"
opts.LockEnabled = true
}
}
for _, dID := range dIDs {
peerName := info.Sites[dID].Name
if dID == globalDeploymentID {
err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)
if err != nil {
logger.LogIf(ctx, c.annotateErr("MakeWithVersioning", fmt.Errorf("error healing bucket for site replication %w from %s -> %s",
err, latestPeerName, peerName)))
return err
}
} else {
admClient, err := c.getAdminClient(ctx, dID)
if err != nil {
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", fmt.Errorf("admin client not found: %w", err)))
return err
}
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap)
logger.LogIf(ctx, c.annotatePeerErr(peerName, "MakeWithVersioning", err))
if err != nil {
return err
}
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil)
logger.LogIf(ctx, c.annotatePeerErr(peerName, "ConfigureRepl", err))
if err != nil {
return err
}
}
}
if len(dIDs) > 0 {
// configure replication from current cluster to other clusters
err := c.PeerBucketConfigureReplHandler(ctx, bucket)
if err != nil {
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err))
return err
}
}
return nil
}
func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error {
bs := info.BucketStats[bucket]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var replMismatch bool
for _, ss := range bs {
if ss.ReplicationCfgMismatch {
replMismatch = true
break
}
}
if replMismatch {
err := c.PeerBucketConfigureReplHandler(ctx, bucket)
if err != nil {
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err))
}
}
return nil
}
func isBucketMetadataEqual(one, two *string) bool {
switch {
case one == nil && two == nil:
return true
case one == nil || two == nil:
return false
default:
return strings.EqualFold(*one, *two)
}
}
func (c *SiteReplicationSys) healIAMSystem(ctx context.Context, objAPI ObjectLayer) error {
info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{
Users: true,
Policies: true,
Groups: true,
})
if err != nil {
return err
}
for policy := range info.PolicyStats {
c.healPolicies(ctx, objAPI, policy, info)
}
for user := range info.UserStats {
c.healUsers(ctx, objAPI, user, info)
}
for group := range info.GroupStats {
c.healGroups(ctx, objAPI, group, info)
}
for user := range info.UserStats {
c.healUserPolicies(ctx, objAPI, user, info, false)
}
for group := range info.GroupStats {
c.healGroupPolicies(ctx, objAPI, group, info, false)
}
for user := range info.UserStats {
c.healUserPolicies(ctx, objAPI, user, info, true)
}
for group := range info.GroupStats {
c.healGroupPolicies(ctx, objAPI, group, info, true)
}
return nil
}
// heal iam policies present on this site to peers, provided current cluster has the most recent update.
func (c *SiteReplicationSys) healPolicies(ctx context.Context, objAPI ObjectLayer, policy string, info srStatusInfo) error {
// create IAM policy on peer cluster if missing
ps := info.PolicyStats[policy]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestPolicyStat srPolicyStatsSummary
)
for dID, ss := range ps {
if lastUpdate.IsZero() {
lastUpdate = ss.policy.UpdatedAt
latestID = dID
latestPolicyStat = ss
}
if !ss.policy.UpdatedAt.IsZero() && ss.policy.UpdatedAt.After(lastUpdate) {
lastUpdate = ss.policy.UpdatedAt
latestID = dID
latestPolicyStat = ss
}
}
if latestID != globalDeploymentID {
// heal only from the site with latest info.
return nil
}
latestPeerName = info.Sites[latestID].Name
// heal policy of peers if peer does not have it.
for dID, pStatus := range ps {
if dID == globalDeploymentID {
continue
}
if !pStatus.PolicyMismatch && pStatus.HasPolicy {
continue
}
peerName := info.Sites[dID].Name
err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicy,
Name: policy,
Policy: latestPolicyStat.policy.Policy,
})
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing IAM policy %s from peer site %s -> site %s : %w", policy, latestPeerName, peerName, err))
}
}
return nil
}
// heal user policy mappings present on this site to peers, provided current cluster has the most recent update.
func (c *SiteReplicationSys) healUserPolicies(ctx context.Context, objAPI ObjectLayer, user string, info srStatusInfo, svcAcct bool) error {
// create user policy mapping on peer cluster if missing
us := info.UserStats[user]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestUserStat srUserStatsSummary
)
for dID, ss := range us {
if lastUpdate.IsZero() {
lastUpdate = ss.userPolicy.UpdatedAt
latestID = dID
latestUserStat = ss
}
if !ss.userPolicy.UpdatedAt.IsZero() && ss.userPolicy.UpdatedAt.After(lastUpdate) {
lastUpdate = ss.userPolicy.UpdatedAt
latestID = dID
latestUserStat = ss
}
}
if latestID != globalDeploymentID {
// heal only from the site with latest info.
return nil
}
latestPeerName = info.Sites[latestID].Name
// heal policy of peers if peer does not have it.
for dID, pStatus := range us {
if dID == globalDeploymentID {
continue
}
if !pStatus.PolicyMismatch && pStatus.HasPolicyMapping {
continue
}
if isPolicyMappingEqual(pStatus.userPolicy, latestUserStat.userPolicy) {
continue
}
peerName := info.Sites[dID].Name
err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicyMapping,
PolicyMapping: &madmin.SRPolicyMapping{
UserOrGroup: user,
IsGroup: false,
Policy: latestUserStat.userPolicy.Policy,
},
})
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing IAM user policy mapping for %s from peer site %s -> site %s : %w", user, latestPeerName, peerName, err))
}
}
return nil
}
// heal group policy mappings present on this site to peers, provided current cluster has the most recent update.
func (c *SiteReplicationSys) healGroupPolicies(ctx context.Context, objAPI ObjectLayer, group string, info srStatusInfo, svcAcct bool) error {
// create group policy mapping on peer cluster if missing
gs := info.GroupStats[group]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestGroupStat srGroupStatsSummary
)
for dID, ss := range gs {
if lastUpdate.IsZero() {
lastUpdate = ss.groupPolicy.UpdatedAt
latestID = dID
latestGroupStat = ss
}
if !ss.groupPolicy.UpdatedAt.IsZero() && ss.groupPolicy.UpdatedAt.After(lastUpdate) {
lastUpdate = ss.groupPolicy.UpdatedAt
latestID = dID
latestGroupStat = ss
}
}
if latestID != globalDeploymentID {
// heal only from the site with latest info.
return nil
}
latestPeerName = info.Sites[latestID].Name
// heal policy of peers if peer does not have it.
for dID, pStatus := range gs {
if dID == globalDeploymentID {
continue
}
if !pStatus.PolicyMismatch && pStatus.HasPolicyMapping {
continue
}
if isPolicyMappingEqual(pStatus.groupPolicy, latestGroupStat.groupPolicy) {
continue
}
peerName := info.Sites[dID].Name
err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicyMapping,
PolicyMapping: &madmin.SRPolicyMapping{
UserOrGroup: group,
IsGroup: true,
Policy: latestGroupStat.groupPolicy.Policy,
},
})
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing IAM group policy mapping for %s from peer site %s -> site %s : %w", group, latestPeerName, peerName, err))
}
}
return nil
}
// heal user accounts of local users that are present on this site, provided current cluster has the most recent update.
func (c *SiteReplicationSys) healUsers(ctx context.Context, objAPI ObjectLayer, user string, info srStatusInfo) error {
// create user if missing; fix user policy mapping if missing
us := info.UserStats[user]
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestUserStat srUserStatsSummary
)
for dID, ss := range us {
if lastUpdate.IsZero() {
lastUpdate = ss.userInfo.UserInfo.UpdatedAt
latestID = dID
latestUserStat = ss
}
if !ss.userInfo.UserInfo.UpdatedAt.IsZero() && ss.userInfo.UserInfo.UpdatedAt.After(lastUpdate) {
lastUpdate = ss.userInfo.UserInfo.UpdatedAt
latestID = dID
latestUserStat = ss
}
}
if latestID != globalDeploymentID {
// heal only from the site with latest info.
return nil
}
latestPeerName = info.Sites[latestID].Name
for dID, uStatus := range us {
if dID == globalDeploymentID {
continue
}
if !uStatus.UserInfoMismatch {
continue
}
if isUserInfoEqual(latestUserStat.userInfo.UserInfo, uStatus.userInfo.UserInfo) {
continue
}
peerName := info.Sites[dID].Name
creds, ok := globalIAMSys.GetUser(ctx, user)
if !ok {
continue
}
// heal only the user accounts that are local users
if creds.IsServiceAccount() {
claims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, creds.AccessKey)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err))
continue
}
_, policy, err := globalIAMSys.GetServiceAccount(ctx, creds.AccessKey)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err))
continue
}
var policyJSON []byte
if policy != nil {
policyJSON, err = json.Marshal(policy)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err))
continue
}
}
if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemSvcAcc,
SvcAccChange: &madmin.SRSvcAccChange{
Create: &madmin.SRSvcAccCreate{
Parent: creds.ParentUser,
AccessKey: creds.AccessKey,
SecretKey: creds.SecretKey,
Groups: creds.Groups,
Claims: claims,
SessionPolicy: json.RawMessage(policyJSON),
Status: creds.Status,
},
},
}); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err))
}
continue
}
if creds.IsTemp() && !creds.IsExpired() {
u, err := globalIAMSys.GetUserInfo(ctx, creds.ParentUser)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing temporary credentials %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err))
continue
}
// Call hook for site replication.
if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemSTSAcc,
STSCredential: &madmin.SRSTSCredential{
AccessKey: creds.AccessKey,
SecretKey: creds.SecretKey,
SessionToken: creds.SessionToken,
ParentUser: creds.ParentUser,
ParentPolicyMapping: u.PolicyName,
},
}); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing temporary credentials %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err))
}
continue
}
if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemIAMUser,
IAMUser: &madmin.SRIAMUser{
AccessKey: user,
IsDeleteReq: false,
UserReq: &madmin.AddOrUpdateUserReq{
SecretKey: creds.SecretKey,
Status: latestUserStat.userInfo.Status,
},
},
}); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing user %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err))
}
}
return nil
}
func (c *SiteReplicationSys) healGroups(ctx context.Context, objAPI ObjectLayer, group string, info srStatusInfo) error {
c.RLock()
defer c.RUnlock()
if !c.enabled {
return nil
}
var (
latestID, latestPeerName string
lastUpdate time.Time
latestGroupStat srGroupStatsSummary
)
// create group if missing; fix group policy mapping if missing
gs, ok := info.GroupStats[group]
if !ok {
return nil
}
for dID, ss := range gs {
if lastUpdate.IsZero() {
lastUpdate = ss.groupDesc.UpdatedAt
latestID = dID
latestGroupStat = ss
}
if !ss.groupDesc.UpdatedAt.IsZero() && ss.groupDesc.UpdatedAt.After(lastUpdate) {
lastUpdate = ss.groupDesc.UpdatedAt
latestID = dID
latestGroupStat = ss
}
}
if latestID != globalDeploymentID {
// heal only from the site with latest info.
return nil
}
latestPeerName = info.Sites[latestID].Name
for dID, gStatus := range gs {
if dID == globalDeploymentID {
continue
}
if !gStatus.GroupDescMismatch {
continue
}
if isGroupDescEqual(latestGroupStat.groupDesc.GroupDesc, gStatus.groupDesc.GroupDesc) {
continue
}
peerName := info.Sites[dID].Name
if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemGroupInfo,
GroupInfo: &madmin.SRGroupInfo{
UpdateReq: madmin.GroupAddRemove{
Group: group,
Status: madmin.GroupStatus(latestGroupStat.groupDesc.Status),
Members: latestGroupStat.groupDesc.Members,
IsRemove: false,
},
},
}); err != nil {
logger.LogIf(ctx, fmt.Errorf("Error healing group %s from peer site %s -> site %s : %w", group, latestPeerName, peerName, err))
}
}
return nil
}
func isGroupDescEqual(g1, g2 madmin.GroupDesc) bool {
if g1.Name != g2.Name ||
g1.Status != g2.Status ||
g1.Policy != g2.Policy {
return false
}
if len(g1.Members) != len(g2.Members) {
return false
}
for _, v1 := range g1.Members {
var found bool
for _, v2 := range g2.Members {
if v1 == v2 {
found = true
break
}
}
if !found {
return false
}
}
return true
}
func isUserInfoEqual(u1, u2 madmin.UserInfo) bool {
if u1.PolicyName != u2.PolicyName ||
u1.Status != u2.Status ||
u1.SecretKey != u2.SecretKey {
return false
}
for len(u1.MemberOf) != len(u2.MemberOf) {
return false
}
for _, v1 := range u1.MemberOf {
var found bool
for _, v2 := range u2.MemberOf {
if v1 == v2 {
found = true
break
}
}
if !found {
return false
}
}
return true
}
func isPolicyMappingEqual(p1, p2 srPolicyMapping) bool {
return p1.Policy == p2.Policy && p1.IsGroup == p2.IsGroup && p1.UserOrGroup == p2.UserOrGroup
}