2021-10-06 19:36:31 -04:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"net/http"
"net/url"
2022-01-06 18:52:43 -05:00
"reflect"
2021-10-06 19:36:31 -04:00
"sort"
"strings"
"sync"
"time"
"github.com/minio/madmin-go"
minioClient "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/replication"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/auth"
sreplication "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/logger"
2022-01-05 05:44:08 -05:00
"github.com/minio/minio/internal/sync/errgroup"
2021-10-06 19:36:31 -04:00
"github.com/minio/pkg/bucket/policy"
2022-01-05 05:44:08 -05:00
bktpolicy "github.com/minio/pkg/bucket/policy"
2021-10-06 19:36:31 -04:00
iampolicy "github.com/minio/pkg/iam/policy"
)
const (
srStatePrefix = minioConfigPrefix + "/site-replication"
2021-12-08 14:50:15 -05:00
srStateFile = "state.json"
2021-10-06 19:36:31 -04:00
)
const (
srStateFormatVersion1 = 1
)
var (
2021-12-14 17:09:57 -05:00
errSRCannotJoin = SRError {
Cause : errors . New ( "this site is already configured for site-replication" ) ,
Code : ErrSiteReplicationInvalidRequest ,
}
errSRDuplicateSites = SRError {
Cause : errors . New ( "duplicate sites provided for site-replication" ) ,
Code : ErrSiteReplicationInvalidRequest ,
}
errSRSelfNotFound = SRError {
Cause : errors . New ( "none of the given sites correspond to the current one" ) ,
Code : ErrSiteReplicationInvalidRequest ,
}
errSRPeerNotFound = SRError {
Cause : errors . New ( "peer not found" ) ,
Code : ErrSiteReplicationInvalidRequest ,
}
errSRNotEnabled = SRError {
Cause : errors . New ( "site replication is not enabled" ) ,
Code : ErrSiteReplicationInvalidRequest ,
}
2021-10-06 19:36:31 -04:00
)
func errSRInvalidRequest ( err error ) SRError {
return SRError {
Cause : err ,
Code : ErrSiteReplicationInvalidRequest ,
}
}
func errSRPeerResp ( err error ) SRError {
return SRError {
Cause : err ,
Code : ErrSiteReplicationPeerResp ,
}
}
func errSRBackendIssue ( err error ) SRError {
return SRError {
Cause : err ,
Code : ErrSiteReplicationBackendIssue ,
}
}
func errSRServiceAccount ( err error ) SRError {
return SRError {
Cause : err ,
Code : ErrSiteReplicationServiceAccountError ,
}
}
func errSRBucketConfigError ( err error ) SRError {
return SRError {
Cause : err ,
Code : ErrSiteReplicationBucketConfigError ,
}
}
func errSRBucketMetaError ( err error ) SRError {
return SRError {
Cause : err ,
Code : ErrSiteReplicationBucketMetaError ,
}
}
func errSRIAMError ( err error ) SRError {
return SRError {
Cause : err ,
Code : ErrSiteReplicationIAMError ,
}
}
2022-01-02 12:15:06 -05:00
var errSRObjectLayerNotReady = SRError {
Cause : fmt . Errorf ( "object layer not ready" ) ,
Code : ErrServerNotInitialized ,
}
2021-10-06 19:36:31 -04:00
func getSRStateFilePath ( ) string {
return srStatePrefix + SlashSeparator + srStateFile
}
// SRError - wrapped error for site replication.
type SRError struct {
Cause error
Code APIErrorCode
}
func ( c SRError ) Error ( ) string {
return c . Cause . Error ( )
}
func wrapSRErr ( err error ) SRError {
return SRError { Cause : err , Code : ErrInternalError }
}
// SiteReplicationSys - manages cluster-level replication.
type SiteReplicationSys struct {
sync . RWMutex
enabled bool
// In-memory and persisted multi-site replication state.
state srState
}
type srState srStateV1
// srStateV1 represents version 1 of the site replication state persistence
// format.
type srStateV1 struct {
Name string ` json:"name" `
// Peers maps peers by their deploymentID
Peers map [ string ] madmin . PeerInfo ` json:"peers" `
ServiceAccountAccessKey string ` json:"serviceAccountAccessKey" `
}
// srStateData represents the format of the current `srStateFile`.
type srStateData struct {
Version int ` json:"version" `
SRState srStateV1 ` json:"srState" `
}
// Init - initialize the site replication manager.
func ( c * SiteReplicationSys ) Init ( ctx context . Context , objAPI ObjectLayer ) error {
err := c . loadFromDisk ( ctx , objAPI )
if err == errConfigNotFound {
return nil
}
c . RLock ( )
defer c . RUnlock ( )
if c . enabled {
logger . Info ( "Cluster Replication initialized." )
}
return err
}
func ( c * SiteReplicationSys ) loadFromDisk ( ctx context . Context , objAPI ObjectLayer ) error {
buf , err := readConfig ( ctx , objAPI , getSRStateFilePath ( ) )
if err != nil {
return err
}
// attempt to read just the version key in the state file to ensure we
// are reading a compatible version.
var ver struct {
Version int ` json:"version" `
}
err = json . Unmarshal ( buf , & ver )
if err != nil {
return err
}
if ver . Version != srStateFormatVersion1 {
return fmt . Errorf ( "Unexpected ClusterRepl state version: %d" , ver . Version )
}
var sdata srStateData
err = json . Unmarshal ( buf , & sdata )
if err != nil {
return err
}
c . Lock ( )
defer c . Unlock ( )
c . state = srState ( sdata . SRState )
c . enabled = true
return nil
}
func ( c * SiteReplicationSys ) saveToDisk ( ctx context . Context , state srState ) error {
sdata := srStateData {
Version : srStateFormatVersion1 ,
SRState : srStateV1 ( state ) ,
}
buf , err := json . Marshal ( sdata )
if err != nil {
return err
}
objAPI := newObjectLayerFn ( )
if objAPI == nil {
return errServerNotInitialized
}
err = saveConfig ( ctx , objAPI , getSRStateFilePath ( ) , buf )
if err != nil {
return err
}
for _ , e := range globalNotificationSys . ReloadSiteReplicationConfig ( ctx ) {
logger . LogIf ( ctx , e )
}
c . Lock ( )
defer c . Unlock ( )
c . state = state
c . enabled = true
return nil
}
const (
// Access key of service account used for perform cluster-replication
// operations.
siteReplicatorSvcAcc = "site-replicator-0"
)
2021-11-30 16:16:37 -05:00
// PeerSiteInfo is a wrapper struct around madmin.PeerSite with extra info on site status
type PeerSiteInfo struct {
madmin . PeerSite
self bool
DeploymentID string
Replicated bool // true if already participating in site replication
Empty bool // true if cluster has no buckets
}
2021-10-06 19:36:31 -04:00
2021-11-30 16:16:37 -05:00
// getSiteStatuses gathers more info on the sites being added
2022-01-19 23:02:24 -05:00
func ( c * SiteReplicationSys ) getSiteStatuses ( ctx context . Context , sites ... madmin . PeerSite ) ( psi [ ] PeerSiteInfo , err error ) {
psi = make ( [ ] PeerSiteInfo , 0 , len ( sites ) )
2021-11-30 16:16:37 -05:00
for _ , v := range sites {
2021-10-06 19:36:31 -04:00
admClient , err := getAdminClient ( v . Endpoint , v . AccessKey , v . SecretKey )
if err != nil {
2021-11-30 16:16:37 -05:00
return psi , errSRPeerResp ( fmt . Errorf ( "unable to create admin client for %s: %w" , v . Name , err ) )
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
2021-10-06 19:36:31 -04:00
info , err := admClient . ServerInfo ( ctx )
if err != nil {
2021-11-30 16:16:37 -05:00
return psi , errSRPeerResp ( fmt . Errorf ( "unable to fetch server info for %s: %w" , v . Name , err ) )
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
s3Client , err := getS3Client ( v )
if err != nil {
return psi , errSRPeerResp ( fmt . Errorf ( "unable to create s3 client for %s: %w" , v . Name , err ) )
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
buckets , err := s3Client . ListBuckets ( ctx )
if err != nil {
return psi , errSRPeerResp ( fmt . Errorf ( "unable to list buckets for %s: %v" , v . Name , err ) )
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
psi = append ( psi , PeerSiteInfo {
PeerSite : v ,
DeploymentID : info . DeploymentID ,
Empty : len ( buckets ) == 0 ,
self : info . DeploymentID == globalDeploymentID ,
} )
2021-11-30 16:16:37 -05:00
}
return
}
2021-10-06 19:36:31 -04:00
2021-11-30 16:16:37 -05:00
// AddPeerClusters - add cluster sites for replication configuration.
2021-12-14 17:09:57 -05:00
func ( c * SiteReplicationSys ) AddPeerClusters ( ctx context . Context , psites [ ] madmin . PeerSite ) ( madmin . ReplicateAddStatus , error ) {
2022-01-19 23:02:24 -05:00
sites , serr := c . getSiteStatuses ( ctx , psites ... )
if serr != nil {
2021-11-30 16:16:37 -05:00
return madmin . ReplicateAddStatus { } , serr
}
var (
currSites madmin . SiteReplicationInfo
currDeploymentIDsSet = set . NewStringSet ( )
err error
)
if c . enabled {
currSites , err = c . GetClusterInfo ( ctx )
2021-10-06 19:36:31 -04:00
if err != nil {
2021-11-30 16:16:37 -05:00
return madmin . ReplicateAddStatus { } , errSRBackendIssue ( err )
2021-10-06 19:36:31 -04:00
}
2021-11-30 16:16:37 -05:00
for _ , v := range currSites . Sites {
currDeploymentIDsSet . Add ( v . DeploymentID )
2021-10-06 19:36:31 -04:00
}
2021-11-30 16:16:37 -05:00
}
deploymentIDsSet := set . NewStringSet ( )
localHasBuckets := false
nonLocalPeerWithBuckets := ""
2022-01-02 12:15:06 -05:00
selfIdx := - 1
2021-11-30 16:16:37 -05:00
for i , v := range sites {
// deploymentIDs must be unique
if deploymentIDsSet . Contains ( v . DeploymentID ) {
2021-12-14 17:09:57 -05:00
return madmin . ReplicateAddStatus { } , errSRDuplicateSites
2021-11-30 16:16:37 -05:00
}
deploymentIDsSet . Add ( v . DeploymentID )
2021-10-06 19:36:31 -04:00
2021-11-30 16:16:37 -05:00
if v . self {
selfIdx = i
localHasBuckets = ! v . Empty
continue
}
if ! v . Empty && ! currDeploymentIDsSet . Contains ( v . DeploymentID ) {
2021-10-06 19:36:31 -04:00
nonLocalPeerWithBuckets = v . Name
}
}
2021-11-30 16:16:37 -05:00
if c . enabled {
// If current cluster is already SR enabled and no new site being added ,fail.
if currDeploymentIDsSet . Equals ( deploymentIDsSet ) {
2021-12-14 17:09:57 -05:00
return madmin . ReplicateAddStatus { } , errSRCannotJoin
2021-11-30 16:16:37 -05:00
}
if len ( currDeploymentIDsSet . Intersection ( deploymentIDsSet ) ) != len ( currDeploymentIDsSet ) {
diffSlc := getMissingSiteNames ( currDeploymentIDsSet , deploymentIDsSet , currSites . Sites )
2021-12-14 17:09:57 -05:00
return madmin . ReplicateAddStatus { } , errSRInvalidRequest ( fmt . Errorf ( "all existing replicated sites must be specified - missing %s" , strings . Join ( diffSlc , " " ) ) )
2021-11-30 16:16:37 -05:00
}
}
2022-01-19 23:02:24 -05:00
// validate that all clusters are using the same IDP settings.
pass , err := c . validateIDPSettings ( ctx , sites )
if err != nil {
return madmin . ReplicateAddStatus { } , err
}
if ! pass {
return madmin . ReplicateAddStatus { } , errSRInvalidRequest ( errors . New ( "all cluster sites must have the same IAM/IDP settings" ) )
}
2021-10-06 19:36:31 -04:00
// For this `add` API, either all clusters must be empty or the local
// cluster must be the only one having some buckets.
if localHasBuckets && nonLocalPeerWithBuckets != "" {
2021-12-14 17:09:57 -05:00
return madmin . ReplicateAddStatus { } , errSRInvalidRequest ( errors . New ( "only one cluster may have data when configuring site replication" ) )
2021-10-06 19:36:31 -04:00
}
if ! localHasBuckets && nonLocalPeerWithBuckets != "" {
2021-12-14 17:09:57 -05:00
return madmin . ReplicateAddStatus { } , errSRInvalidRequest ( fmt . Errorf ( "please send your request to the cluster containing data/buckets: %s" , nonLocalPeerWithBuckets ) )
2021-10-06 19:36:31 -04:00
}
// FIXME: Ideally, we also need to check if there are any global IAM
// policies and any (LDAP user created) service accounts on the other
// peer clusters, and if so, reject the cluster replicate add request.
// This is not yet implemented.
// VALIDATIONS COMPLETE.
// Create a common service account for all clusters, with root
// permissions.
// Create a local service account.
2021-11-30 16:16:37 -05:00
// Generate a secret key for the service account if not created already.
2021-10-06 19:36:31 -04:00
var secretKey string
2021-11-30 16:16:37 -05:00
svcCred , _ , err := globalIAMSys . getServiceAccount ( ctx , siteReplicatorSvcAcc )
switch {
case err == errNoSuchServiceAccount :
_ , secretKey , err = auth . GenerateCredentials ( )
if err != nil {
return madmin . ReplicateAddStatus { } , errSRServiceAccount ( fmt . Errorf ( "unable to create local service account: %w" , err ) )
2021-10-06 19:36:31 -04:00
}
2021-11-30 16:16:37 -05:00
svcCred , err = globalIAMSys . NewServiceAccount ( ctx , sites [ selfIdx ] . AccessKey , nil , newServiceAccountOpts {
accessKey : siteReplicatorSvcAcc ,
secretKey : secretKey ,
} )
if err != nil {
return madmin . ReplicateAddStatus { } , errSRServiceAccount ( fmt . Errorf ( "unable to create local service account: %w" , err ) )
}
case err == nil :
secretKey = svcCred . SecretKey
default :
return madmin . ReplicateAddStatus { } , errSRBackendIssue ( err )
2021-10-06 19:36:31 -04:00
}
2021-12-27 02:10:34 -05:00
joinReq := madmin . SRPeerJoinReq {
2021-10-06 19:36:31 -04:00
SvcAcctAccessKey : svcCred . AccessKey ,
2021-11-30 16:16:37 -05:00
SvcAcctSecretKey : secretKey ,
2021-10-06 19:36:31 -04:00
Peers : make ( map [ string ] madmin . PeerInfo ) ,
}
2021-11-30 16:16:37 -05:00
for _ , v := range sites {
joinReq . Peers [ v . DeploymentID ] = madmin . PeerInfo {
2021-10-06 19:36:31 -04:00
Endpoint : v . Endpoint ,
Name : v . Name ,
2021-11-30 16:16:37 -05:00
DeploymentID : v . DeploymentID ,
2021-10-06 19:36:31 -04:00
}
}
addedCount := 0
2021-11-30 16:16:37 -05:00
var (
2021-12-14 17:09:57 -05:00
peerAddErr error
2021-11-30 16:16:37 -05:00
admClient * madmin . AdminClient
)
for _ , v := range sites {
if v . self {
2021-10-06 19:36:31 -04:00
continue
}
2021-11-30 16:16:37 -05:00
switch {
case currDeploymentIDsSet . Contains ( v . DeploymentID ) :
admClient , err = c . getAdminClient ( ctx , v . DeploymentID )
default :
admClient , err = getAdminClient ( v . Endpoint , v . AccessKey , v . SecretKey )
}
2021-10-06 19:36:31 -04:00
if err != nil {
peerAddErr = errSRPeerResp ( fmt . Errorf ( "unable to create admin client for %s: %w" , v . Name , err ) )
break
}
joinReq . SvcAcctParent = v . AccessKey
2021-12-27 02:10:34 -05:00
err = admClient . SRPeerJoin ( ctx , joinReq )
2021-10-06 19:36:31 -04:00
if err != nil {
peerAddErr = errSRPeerResp ( fmt . Errorf ( "unable to link with peer %s: %w" , v . Name , err ) )
break
}
addedCount ++
}
2021-12-14 17:09:57 -05:00
if peerAddErr != nil {
2021-10-06 19:36:31 -04:00
if addedCount == 0 {
return madmin . ReplicateAddStatus { } , peerAddErr
}
// In this case, it means at least one cluster was added
// successfully, we need to send a response to the client with
// some details - FIXME: the disks on this cluster would need to
// be cleaned to recover.
partial := madmin . ReplicateAddStatus {
Status : madmin . ReplicateAddStatusPartial ,
ErrDetail : peerAddErr . Error ( ) ,
}
2021-11-30 16:16:37 -05:00
2021-12-14 17:09:57 -05:00
return partial , nil
2021-10-06 19:36:31 -04:00
}
2021-12-14 17:09:57 -05:00
2021-10-06 19:36:31 -04:00
// Other than handling existing buckets, we can now save the cluster
// replication configuration state.
state := srState {
Name : sites [ selfIdx ] . Name ,
Peers : joinReq . Peers ,
ServiceAccountAccessKey : svcCred . AccessKey ,
}
2021-12-14 17:09:57 -05:00
if err = c . saveToDisk ( ctx , state ) ; err != nil {
2021-10-06 19:36:31 -04:00
return madmin . ReplicateAddStatus {
Status : madmin . ReplicateAddStatusPartial ,
ErrDetail : fmt . Sprintf ( "unable to save cluster-replication state on local: %v" , err ) ,
2021-12-14 17:09:57 -05:00
} , nil
2021-10-06 19:36:31 -04:00
}
result := madmin . ReplicateAddStatus {
Success : true ,
Status : madmin . ReplicateAddStatusSuccess ,
}
2021-12-14 17:09:57 -05:00
2022-01-19 23:02:24 -05:00
if err := c . syncToAllPeers ( ctx ) ; err != nil {
2021-12-14 17:09:57 -05:00
result . InitialSyncErrorMessage = err . Error ( )
2021-10-06 19:36:31 -04:00
}
2021-12-14 17:09:57 -05:00
return result , nil
2021-10-06 19:36:31 -04:00
}
2022-01-05 05:44:08 -05:00
// PeerJoinReq - internal API handler to respond to a peer cluster's request
2021-10-06 19:36:31 -04:00
// to join.
2022-01-05 05:44:08 -05:00
func ( c * SiteReplicationSys ) PeerJoinReq ( ctx context . Context , arg madmin . SRPeerJoinReq ) error {
2021-10-06 19:36:31 -04:00
var ourName string
for d , p := range arg . Peers {
if d == globalDeploymentID {
ourName = p . Name
break
}
}
if ourName == "" {
2021-12-14 17:09:57 -05:00
return errSRSelfNotFound
2021-10-06 19:36:31 -04:00
}
2021-11-30 16:16:37 -05:00
_ , _ , err := globalIAMSys . GetServiceAccount ( ctx , arg . SvcAcctAccessKey )
if err == errNoSuchServiceAccount {
_ , err = globalIAMSys . NewServiceAccount ( ctx , arg . SvcAcctParent , nil , newServiceAccountOpts {
accessKey : arg . SvcAcctAccessKey ,
secretKey : arg . SvcAcctSecretKey ,
} )
}
2021-10-06 19:36:31 -04:00
if err != nil {
return errSRServiceAccount ( fmt . Errorf ( "unable to create service account on %s: %v" , ourName , err ) )
}
state := srState {
Name : ourName ,
Peers : arg . Peers ,
ServiceAccountAccessKey : arg . SvcAcctAccessKey ,
}
2021-12-14 17:09:57 -05:00
if err = c . saveToDisk ( ctx , state ) ; err != nil {
2021-10-06 19:36:31 -04:00
return errSRBackendIssue ( fmt . Errorf ( "unable to save cluster-replication state to disk on %s: %v" , ourName , err ) )
}
2021-12-14 17:09:57 -05:00
return nil
2021-10-06 19:36:31 -04:00
}
// GetIDPSettings returns info about the configured identity provider. It is
// used to validate that all peers have the same IDP.
func ( c * SiteReplicationSys ) GetIDPSettings ( ctx context . Context ) madmin . IDPSettings {
2022-01-06 18:52:43 -05:00
s := madmin . IDPSettings { }
s . LDAP = madmin . LDAPSettings {
2021-10-06 19:36:31 -04:00
IsLDAPEnabled : globalLDAPConfig . Enabled ,
LDAPUserDNSearchBase : globalLDAPConfig . UserDNSearchBaseDN ,
LDAPUserDNSearchFilter : globalLDAPConfig . UserDNSearchFilter ,
LDAPGroupSearchBase : globalLDAPConfig . GroupSearchBaseDistName ,
LDAPGroupSearchFilter : globalLDAPConfig . GroupSearchFilter ,
}
2022-01-06 18:52:43 -05:00
s . OpenID = globalOpenIDConfig . GetSettings ( )
if s . OpenID . Enabled {
s . OpenID . Region = globalSite . Region
}
return s
2021-10-06 19:36:31 -04:00
}
2021-12-14 17:09:57 -05:00
func ( c * SiteReplicationSys ) validateIDPSettings ( ctx context . Context , peers [ ] PeerSiteInfo ) ( bool , error ) {
2021-10-06 19:36:31 -04:00
s := make ( [ ] madmin . IDPSettings , 0 , len ( peers ) )
2021-11-30 16:16:37 -05:00
for _ , v := range peers {
if v . self {
2021-10-06 19:36:31 -04:00
s = append ( s , c . GetIDPSettings ( ctx ) )
continue
}
admClient , err := getAdminClient ( v . Endpoint , v . AccessKey , v . SecretKey )
if err != nil {
return false , errSRPeerResp ( fmt . Errorf ( "unable to create admin client for %s: %w" , v . Name , err ) )
}
2021-12-27 02:10:34 -05:00
is , err := admClient . SRPeerGetIDPSettings ( ctx )
2021-10-06 19:36:31 -04:00
if err != nil {
return false , errSRPeerResp ( fmt . Errorf ( "unable to fetch IDP settings from %s: %v" , v . Name , err ) )
}
s = append ( s , is )
}
for i := 1 ; i < len ( s ) ; i ++ {
2022-01-06 18:52:43 -05:00
if ! reflect . DeepEqual ( s [ i ] , s [ 0 ] ) {
2021-12-14 17:09:57 -05:00
return false , nil
2021-10-06 19:36:31 -04:00
}
}
2021-12-14 17:09:57 -05:00
return true , nil
2021-10-06 19:36:31 -04:00
}
// GetClusterInfo - returns site replication information.
func ( c * SiteReplicationSys ) GetClusterInfo ( ctx context . Context ) ( info madmin . SiteReplicationInfo , err error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return info , nil
}
info . Enabled = true
info . Name = c . state . Name
info . Sites = make ( [ ] madmin . PeerInfo , 0 , len ( c . state . Peers ) )
for _ , peer := range c . state . Peers {
info . Sites = append ( info . Sites , peer )
}
sort . SliceStable ( info . Sites , func ( i , j int ) bool {
return info . Sites [ i ] . Name < info . Sites [ j ] . Name
} )
info . ServiceAccountAccessKey = c . state . ServiceAccountAccessKey
return info , nil
}
// MakeBucketHook - called during a regular make bucket call when cluster
// replication is enabled. It is responsible for the creation of the same bucket
// on remote clusters, and creating replication rules on local and peer
// clusters.
func ( c * SiteReplicationSys ) MakeBucketHook ( ctx context . Context , bucket string , opts BucketOptions ) error {
// At this point, the local bucket is created.
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return nil
}
optsMap := make ( map [ string ] string )
if opts . Location != "" {
optsMap [ "location" ] = opts . Location
}
if opts . LockEnabled {
2021-12-14 17:09:57 -05:00
optsMap [ "lockEnabled" ] = "true"
optsMap [ "versioningEnabled" ] = "true"
}
if opts . VersioningEnabled {
optsMap [ "versioningEnabled" ] = "true"
2021-10-06 19:36:31 -04:00
}
// Create bucket and enable versioning on all peers.
makeBucketConcErr := c . concDo (
func ( ) error {
err := c . PeerBucketMakeWithVersioningHandler ( ctx , bucket , opts )
logger . LogIf ( ctx , c . annotateErr ( "MakeWithVersioning" , err ) )
return err
} ,
func ( deploymentID string , p madmin . PeerInfo ) error {
admClient , err := c . getAdminClient ( ctx , deploymentID )
if err != nil {
return err
}
2021-12-27 02:10:34 -05:00
err = admClient . SRPeerBucketOps ( ctx , bucket , madmin . MakeWithVersioningBktOp , optsMap )
2021-10-06 19:36:31 -04:00
logger . LogIf ( ctx , c . annotatePeerErr ( p . Name , "MakeWithVersioning" , err ) )
return err
} ,
)
// If all make-bucket-and-enable-versioning operations failed, nothing
// more to do.
if makeBucketConcErr . allFailed ( ) {
return makeBucketConcErr
}
// Log any errors in make-bucket operations.
logger . LogIf ( ctx , makeBucketConcErr . summaryErr )
// Create bucket remotes and add replication rules for the bucket on
// self and peers.
makeRemotesConcErr := c . concDo (
func ( ) error {
err := c . PeerBucketConfigureReplHandler ( ctx , bucket )
logger . LogIf ( ctx , c . annotateErr ( "ConfigureRepl" , err ) )
return err
} ,
func ( deploymentID string , p madmin . PeerInfo ) error {
admClient , err := c . getAdminClient ( ctx , deploymentID )
if err != nil {
return err
}
2021-12-27 02:10:34 -05:00
err = admClient . SRPeerBucketOps ( ctx , bucket , madmin . ConfigureReplBktOp , nil )
2021-10-06 19:36:31 -04:00
logger . LogIf ( ctx , c . annotatePeerErr ( p . Name , "ConfigureRepl" , err ) )
return err
} ,
)
err := makeRemotesConcErr . summaryErr
if err != nil {
return err
}
return nil
}
// DeleteBucketHook - called during a regular delete bucket call when cluster
// replication is enabled. It is responsible for the deletion of the same bucket
// on remote clusters.
func ( c * SiteReplicationSys ) DeleteBucketHook ( ctx context . Context , bucket string , forceDelete bool ) error {
// At this point, the local bucket is deleted.
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return nil
}
op := madmin . DeleteBucketBktOp
if forceDelete {
op = madmin . ForceDeleteBucketBktOp
}
// Send bucket delete to other clusters.
cErr := c . concDo ( nil , func ( deploymentID string , p madmin . PeerInfo ) error {
admClient , err := c . getAdminClient ( ctx , deploymentID )
if err != nil {
return wrapSRErr ( err )
}
2021-12-27 02:10:34 -05:00
err = admClient . SRPeerBucketOps ( ctx , bucket , op , nil )
2021-10-06 19:36:31 -04:00
logger . LogIf ( ctx , c . annotatePeerErr ( p . Name , "DeleteBucket" , err ) )
return err
} )
return cErr . summaryErr
}
// PeerBucketMakeWithVersioningHandler - creates bucket and enables versioning.
func ( c * SiteReplicationSys ) PeerBucketMakeWithVersioningHandler ( ctx context . Context , bucket string , opts BucketOptions ) error {
objAPI := newObjectLayerFn ( )
if objAPI == nil {
return errServerNotInitialized
}
2021-12-14 17:09:57 -05:00
2021-10-06 19:36:31 -04:00
err := objAPI . MakeBucketWithLocation ( ctx , bucket , opts )
if err != nil {
// Check if this is a bucket exists error.
_ , ok1 := err . ( BucketExists )
_ , ok2 := err . ( BucketAlreadyExists )
if ! ok1 && ! ok2 {
logger . LogIf ( ctx , c . annotateErr ( "MakeBucketErr on peer call" , err ) )
return wrapSRErr ( err )
}
} else {
// Load updated bucket metadata into memory as new
// bucket was created.
globalNotificationSys . LoadBucketMetadata ( GlobalContext , bucket )
}
2021-12-14 17:09:57 -05:00
meta , err := globalBucketMetadataSys . Get ( bucket )
2021-10-06 19:36:31 -04:00
if err != nil {
2021-12-14 17:09:57 -05:00
logger . LogIf ( ctx , c . annotateErr ( "MakeBucketErr on peer call" , err ) )
2021-10-06 19:36:31 -04:00
return wrapSRErr ( err )
}
2021-12-14 17:09:57 -05:00
meta . VersioningConfigXML = enabledBucketVersioningConfig
if opts . LockEnabled {
meta . ObjectLockConfigXML = enabledBucketObjectLockConfig
2021-10-06 19:36:31 -04:00
}
2021-12-14 17:09:57 -05:00
if err := meta . Save ( context . Background ( ) , objAPI ) ; err != nil {
return wrapSRErr ( err )
}
globalBucketMetadataSys . Set ( bucket , meta )
// Load updated bucket metadata into memory as new metadata updated.
globalNotificationSys . LoadBucketMetadata ( GlobalContext , bucket )
2021-10-06 19:36:31 -04:00
return nil
}
// PeerBucketConfigureReplHandler - configures replication remote and
// replication rules to all other peers for the local bucket.
func ( c * SiteReplicationSys ) PeerBucketConfigureReplHandler ( ctx context . Context , bucket string ) error {
creds , err := c . getPeerCreds ( )
if err != nil {
return wrapSRErr ( err )
}
// The following function, creates a bucket remote and sets up a bucket
// replication rule for the given peer.
configurePeerFn := func ( d string , peer madmin . PeerInfo ) error {
ep , _ := url . Parse ( peer . Endpoint )
targets := globalBucketTargetSys . ListTargets ( ctx , bucket , string ( madmin . ReplicationService ) )
targetARN := ""
for _ , target := range targets {
if target . SourceBucket == bucket &&
target . TargetBucket == bucket &&
target . Endpoint == ep . Host &&
target . Secure == ( ep . Scheme == "https" ) &&
target . Type == madmin . ReplicationService {
targetARN = target . Arn
break
}
}
if targetARN == "" {
bucketTarget := madmin . BucketTarget {
SourceBucket : bucket ,
Endpoint : ep . Host ,
Credentials : & madmin . Credentials {
AccessKey : creds . AccessKey ,
SecretKey : creds . SecretKey ,
} ,
TargetBucket : bucket ,
Secure : ep . Scheme == "https" ,
API : "s3v4" ,
Type : madmin . ReplicationService ,
Region : "" ,
ReplicationSync : false ,
}
bucketTarget . Arn = globalBucketTargetSys . getRemoteARN ( bucket , & bucketTarget )
err := globalBucketTargetSys . SetTarget ( ctx , bucket , & bucketTarget , false )
if err != nil {
logger . LogIf ( ctx , c . annotatePeerErr ( peer . Name , "Bucket target creation error" , err ) )
return err
}
targets , err := globalBucketTargetSys . ListBucketTargets ( ctx , bucket )
if err != nil {
return err
}
tgtBytes , err := json . Marshal ( & targets )
if err != nil {
return err
}
if err = globalBucketMetadataSys . Update ( bucket , bucketTargetsFile , tgtBytes ) ; err != nil {
return err
}
targetARN = bucketTarget . Arn
}
// Create bucket replication rule to this peer.
// To add the bucket replication rule, we fetch the current
// server configuration, and convert it to minio-go's
// replication configuration type (by converting to xml and
// parsing it back), use minio-go's add rule function, and
// finally convert it back to the server type (again via xml).
// This is needed as there is no add-rule function in the server
// yet.
// Though we do not check if the rule already exists, this is
// not a problem as we are always using the same replication
// rule ID - if the rule already exists, it is just replaced.
replicationConfigS , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket )
if err != nil {
_ , ok := err . ( BucketReplicationConfigNotFound )
if ! ok {
return err
}
}
var replicationConfig replication . Config
if replicationConfigS != nil {
replCfgSBytes , err := xml . Marshal ( replicationConfigS )
if err != nil {
return err
}
err = xml . Unmarshal ( replCfgSBytes , & replicationConfig )
if err != nil {
return err
}
}
2021-11-30 16:16:37 -05:00
var (
ruleID = fmt . Sprintf ( "site-repl-%s" , d )
hasRule bool
opts = replication . Options {
// Set the ID so we can identify the rule as being
// created for site-replication and include the
// destination cluster's deployment ID.
ID : ruleID ,
// Use a helper to generate unique priority numbers.
Priority : fmt . Sprintf ( "%d" , getPriorityHelper ( replicationConfig ) ) ,
Op : replication . AddOption ,
RuleStatus : "enable" ,
DestBucket : targetARN ,
// Replicate everything!
ReplicateDeletes : "enable" ,
ReplicateDeleteMarkers : "enable" ,
ReplicaSync : "enable" ,
ExistingObjectReplicate : "enable" ,
}
)
for _ , r := range replicationConfig . Rules {
if r . ID == ruleID {
hasRule = true
}
}
switch {
case hasRule :
err = replicationConfig . EditRule ( opts )
default :
err = replicationConfig . AddRule ( opts )
}
2021-10-06 19:36:31 -04:00
if err != nil {
logger . LogIf ( ctx , c . annotatePeerErr ( peer . Name , "Error adding bucket replication rule" , err ) )
return err
}
// Now convert the configuration back to server's type so we can
// do some validation.
newReplCfgBytes , err := xml . Marshal ( replicationConfig )
if err != nil {
return err
}
newReplicationConfig , err := sreplication . ParseConfig ( bytes . NewReader ( newReplCfgBytes ) )
if err != nil {
return err
}
sameTarget , apiErr := validateReplicationDestination ( ctx , bucket , newReplicationConfig )
if apiErr != noError {
return fmt . Errorf ( "bucket replication config validation error: %#v" , apiErr )
}
err = newReplicationConfig . Validate ( bucket , sameTarget )
if err != nil {
return err
}
// Config looks good, so we save it.
replCfgData , err := xml . Marshal ( newReplicationConfig )
if err != nil {
return err
}
err = globalBucketMetadataSys . Update ( bucket , bucketReplicationConfig , replCfgData )
logger . LogIf ( ctx , c . annotatePeerErr ( peer . Name , "Error updating replication configuration" , err ) )
return err
}
2022-01-19 23:02:24 -05:00
c . RLock ( )
defer c . RUnlock ( )
2021-10-06 19:36:31 -04:00
errMap := make ( map [ string ] error , len ( c . state . Peers ) )
for d , peer := range c . state . Peers {
if d == globalDeploymentID {
continue
}
if err := configurePeerFn ( d , peer ) ; err != nil {
errMap [ d ] = err
}
}
return c . toErrorFromErrMap ( errMap )
}
// PeerBucketDeleteHandler - deletes bucket on local in response to a delete
// bucket request from a peer.
func ( c * SiteReplicationSys ) PeerBucketDeleteHandler ( ctx context . Context , bucket string , forceDelete bool ) error {
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return errSRNotEnabled
}
objAPI := newObjectLayerFn ( )
if objAPI == nil {
return errServerNotInitialized
}
2021-12-14 17:09:57 -05:00
if globalDNSConfig != nil {
if err := globalDNSConfig . Delete ( bucket ) ; err != nil {
return err
}
}
2021-10-06 19:36:31 -04:00
err := objAPI . DeleteBucket ( ctx , bucket , DeleteBucketOptions { Force : forceDelete } )
if err != nil {
2021-12-14 17:09:57 -05:00
if globalDNSConfig != nil {
if err2 := globalDNSConfig . Put ( bucket ) ; err2 != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to restore bucket DNS entry %w, please fix it manually" , err2 ) )
}
}
2021-10-06 19:36:31 -04:00
return err
}
globalNotificationSys . DeleteBucketMetadata ( ctx , bucket )
return nil
}
// IAMChangeHook - called when IAM items need to be replicated to peer clusters.
// This includes named policy creation, policy mapping changes and service
// account changes.
//
// All policies are replicated.
//
// Policy mappings are only replicated when they are for LDAP users or groups
// (as an external IDP is always assumed when SR is used). In the case of
// OpenID, such mappings are provided from the IDP directly and so are not
// applicable here.
//
2022-01-06 18:52:43 -05:00
// Service accounts are replicated as long as they are not meant for the root
// user.
2021-10-06 19:36:31 -04:00
//
// STS accounts are replicated, but only if the session token is verifiable
// using the local cluster's root credential.
func ( c * SiteReplicationSys ) IAMChangeHook ( ctx context . Context , item madmin . SRIAMItem ) error {
// The IAM item has already been applied to the local cluster at this
// point, and only needs to be updated on all remote peer clusters.
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return nil
}
cErr := c . concDo ( nil , func ( d string , p madmin . PeerInfo ) error {
admClient , err := c . getAdminClient ( ctx , d )
if err != nil {
return wrapSRErr ( err )
}
2021-12-27 02:10:34 -05:00
err = admClient . SRPeerReplicateIAMItem ( ctx , item )
logger . LogIf ( ctx , c . annotatePeerErr ( p . Name , "SRPeerReplicateIAMItem" , err ) )
2021-10-06 19:36:31 -04:00
return err
} )
return cErr . summaryErr
}
// PeerAddPolicyHandler - copies IAM policy to local. A nil policy argument,
// causes the named policy to be deleted.
func ( c * SiteReplicationSys ) PeerAddPolicyHandler ( ctx context . Context , policyName string , p * iampolicy . Policy ) error {
var err error
if p == nil {
2021-11-29 17:38:57 -05:00
err = globalIAMSys . DeletePolicy ( ctx , policyName , true )
2021-10-06 19:36:31 -04:00
} else {
2021-11-15 17:14:22 -05:00
err = globalIAMSys . SetPolicy ( ctx , policyName , * p )
2021-10-06 19:36:31 -04:00
}
if err != nil {
return wrapSRErr ( err )
}
return nil
}
2022-01-06 18:52:43 -05:00
// PeerIAMUserChangeHandler - copies IAM user to local.
func ( c * SiteReplicationSys ) PeerIAMUserChangeHandler ( ctx context . Context , change * madmin . SRIAMUser ) error {
if change == nil {
return errSRInvalidRequest ( errInvalidArgument )
}
var err error
if change . IsDeleteReq {
err = globalIAMSys . DeleteUser ( ctx , change . AccessKey , true )
} else {
if change . UserReq == nil {
return errSRInvalidRequest ( errInvalidArgument )
}
2022-01-19 23:02:24 -05:00
userReq := * change . UserReq
if userReq . Status != "" && userReq . SecretKey == "" {
// Status is set without secretKey updates means we are
// only changing the account status.
err = globalIAMSys . SetUserStatus ( ctx , change . AccessKey , userReq . Status )
} else {
err = globalIAMSys . CreateUser ( ctx , change . AccessKey , userReq )
}
2022-01-06 18:52:43 -05:00
}
if err != nil {
return wrapSRErr ( err )
}
return nil
}
// PeerGroupInfoChangeHandler - copies group changes to local.
func ( c * SiteReplicationSys ) PeerGroupInfoChangeHandler ( ctx context . Context , change * madmin . SRGroupInfo ) error {
if change == nil {
return errSRInvalidRequest ( errInvalidArgument )
}
updReq := change . UpdateReq
var err error
if updReq . IsRemove {
err = globalIAMSys . RemoveUsersFromGroup ( ctx , updReq . Group , updReq . Members )
} else {
2022-01-19 23:02:24 -05:00
if updReq . Status != "" && len ( updReq . Members ) == 0 {
err = globalIAMSys . SetGroupStatus ( ctx , updReq . Group , updReq . Status == madmin . GroupEnabled )
} else {
err = globalIAMSys . AddUsersToGroup ( ctx , updReq . Group , updReq . Members )
}
2022-01-06 18:52:43 -05:00
}
if err != nil {
return wrapSRErr ( err )
}
return nil
}
2021-10-06 19:36:31 -04:00
// PeerSvcAccChangeHandler - copies service-account change to local.
2021-12-08 14:50:15 -05:00
func ( c * SiteReplicationSys ) PeerSvcAccChangeHandler ( ctx context . Context , change * madmin . SRSvcAccChange ) error {
if change == nil {
2021-12-14 17:09:57 -05:00
return errSRInvalidRequest ( errInvalidArgument )
2021-12-08 14:50:15 -05:00
}
2021-10-06 19:36:31 -04:00
switch {
case change . Create != nil :
var sp * iampolicy . Policy
var err error
if len ( change . Create . SessionPolicy ) > 0 {
sp , err = iampolicy . ParseConfig ( bytes . NewReader ( change . Create . SessionPolicy ) )
if err != nil {
return wrapSRErr ( err )
}
}
opts := newServiceAccountOpts {
accessKey : change . Create . AccessKey ,
secretKey : change . Create . SecretKey ,
sessionPolicy : sp ,
claims : change . Create . Claims ,
}
2021-11-29 17:38:57 -05:00
_ , err = globalIAMSys . NewServiceAccount ( ctx , change . Create . Parent , change . Create . Groups , opts )
2021-10-06 19:36:31 -04:00
if err != nil {
return wrapSRErr ( err )
}
case change . Update != nil :
var sp * iampolicy . Policy
var err error
if len ( change . Update . SessionPolicy ) > 0 {
sp , err = iampolicy . ParseConfig ( bytes . NewReader ( change . Update . SessionPolicy ) )
if err != nil {
return wrapSRErr ( err )
}
}
opts := updateServiceAccountOpts {
secretKey : change . Update . SecretKey ,
status : change . Update . Status ,
sessionPolicy : sp ,
}
err = globalIAMSys . UpdateServiceAccount ( ctx , change . Update . AccessKey , opts )
if err != nil {
return wrapSRErr ( err )
}
case change . Delete != nil :
2021-12-21 05:16:50 -05:00
err := globalIAMSys . DeleteServiceAccount ( ctx , change . Delete . AccessKey , true )
2021-10-06 19:36:31 -04:00
if err != nil {
return wrapSRErr ( err )
}
}
return nil
}
// PeerPolicyMappingHandler - copies policy mapping to local.
2021-12-08 14:50:15 -05:00
func ( c * SiteReplicationSys ) PeerPolicyMappingHandler ( ctx context . Context , mapping * madmin . SRPolicyMapping ) error {
if mapping == nil {
2021-12-14 17:09:57 -05:00
return errSRInvalidRequest ( errInvalidArgument )
2021-12-08 14:50:15 -05:00
}
2021-11-15 17:14:22 -05:00
err := globalIAMSys . PolicyDBSet ( ctx , mapping . UserOrGroup , mapping . Policy , mapping . IsGroup )
2021-10-06 19:36:31 -04:00
if err != nil {
return wrapSRErr ( err )
}
return nil
}
// PeerSTSAccHandler - replicates STS credential locally.
2021-12-08 14:50:15 -05:00
func ( c * SiteReplicationSys ) PeerSTSAccHandler ( ctx context . Context , stsCred * madmin . SRSTSCredential ) error {
if stsCred == nil {
2021-12-14 17:09:57 -05:00
return errSRInvalidRequest ( errInvalidArgument )
2021-12-08 14:50:15 -05:00
}
2021-10-06 19:36:31 -04:00
// Verify the session token of the stsCred
claims , err := auth . ExtractClaims ( stsCred . SessionToken , globalActiveCred . SecretKey )
if err != nil {
logger . LogIf ( ctx , err )
return fmt . Errorf ( "STS credential could not be verified" )
}
mapClaims := claims . Map ( )
expiry , err := auth . ExpToInt64 ( mapClaims [ "exp" ] )
if err != nil {
2021-12-08 14:50:15 -05:00
return fmt . Errorf ( "Expiry claim was not found: %v" , mapClaims )
2021-10-06 19:36:31 -04:00
}
cred := auth . Credentials {
AccessKey : stsCred . AccessKey ,
SecretKey : stsCred . SecretKey ,
Expiration : time . Unix ( expiry , 0 ) . UTC ( ) ,
SessionToken : stsCred . SessionToken ,
2022-01-06 18:52:43 -05:00
ParentUser : stsCred . ParentUser ,
2021-10-06 19:36:31 -04:00
Status : auth . AccountOn ,
2022-01-06 18:52:43 -05:00
}
// Extract the username and lookup DN and groups in LDAP.
ldapUser , isLDAPSTS := claims . Lookup ( ldapUserN )
switch {
case isLDAPSTS :
// Need to lookup the groups from LDAP.
_ , ldapGroups , err := globalLDAPConfig . LookupUserDN ( ldapUser )
if err != nil {
return fmt . Errorf ( "unable to query LDAP server for %s: %v" , ldapUser , err )
}
cred . Groups = ldapGroups
2021-10-06 19:36:31 -04:00
}
// Set these credentials to IAM.
2022-01-06 18:52:43 -05:00
if err := globalIAMSys . SetTempUser ( ctx , cred . AccessKey , cred , stsCred . ParentPolicyMapping ) ; err != nil {
return fmt . Errorf ( "unable to save STS credential and/or parent policy mapping: %v" , err )
2021-10-06 19:36:31 -04:00
}
return nil
}
// BucketMetaHook - called when bucket meta changes happen and need to be
// replicated to peer clusters.
func ( c * SiteReplicationSys ) BucketMetaHook ( ctx context . Context , item madmin . SRBucketMeta ) error {
// The change has already been applied to the local cluster at this
// point, and only needs to be updated on all remote peer clusters.
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return nil
}
cErr := c . concDo ( nil , func ( d string , p madmin . PeerInfo ) error {
admClient , err := c . getAdminClient ( ctx , d )
if err != nil {
return wrapSRErr ( err )
}
2021-12-27 02:10:34 -05:00
err = admClient . SRPeerReplicateBucketMeta ( ctx , item )
logger . LogIf ( ctx , c . annotatePeerErr ( p . Name , "SRPeerReplicateBucketMeta" , err ) )
2021-10-06 19:36:31 -04:00
return err
} )
return cErr . summaryErr
}
// PeerBucketPolicyHandler - copies/deletes policy to local cluster.
func ( c * SiteReplicationSys ) PeerBucketPolicyHandler ( ctx context . Context , bucket string , policy * policy . Policy ) error {
if policy != nil {
configData , err := json . Marshal ( policy )
if err != nil {
return wrapSRErr ( err )
}
err = globalBucketMetadataSys . Update ( bucket , bucketPolicyConfig , configData )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
// Delete the bucket policy
err := globalBucketMetadataSys . Update ( bucket , bucketPolicyConfig , nil )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
// PeerBucketTaggingHandler - copies/deletes tags to local cluster.
func ( c * SiteReplicationSys ) PeerBucketTaggingHandler ( ctx context . Context , bucket string , tags * string ) error {
if tags != nil {
configData , err := base64 . StdEncoding . DecodeString ( * tags )
if err != nil {
return wrapSRErr ( err )
}
err = globalBucketMetadataSys . Update ( bucket , bucketTaggingConfig , configData )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
// Delete the tags
err := globalBucketMetadataSys . Update ( bucket , bucketTaggingConfig , nil )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
// PeerBucketObjectLockConfigHandler - sets object lock on local bucket.
func ( c * SiteReplicationSys ) PeerBucketObjectLockConfigHandler ( ctx context . Context , bucket string , objectLockData * string ) error {
if objectLockData != nil {
configData , err := base64 . StdEncoding . DecodeString ( * objectLockData )
if err != nil {
return wrapSRErr ( err )
}
err = globalBucketMetadataSys . Update ( bucket , objectLockConfig , configData )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
return nil
}
// PeerBucketSSEConfigHandler - copies/deletes SSE config to local cluster.
func ( c * SiteReplicationSys ) PeerBucketSSEConfigHandler ( ctx context . Context , bucket string , sseConfig * string ) error {
if sseConfig != nil {
configData , err := base64 . StdEncoding . DecodeString ( * sseConfig )
if err != nil {
return wrapSRErr ( err )
}
err = globalBucketMetadataSys . Update ( bucket , bucketSSEConfig , configData )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
// Delete sse config
err := globalBucketMetadataSys . Update ( bucket , bucketSSEConfig , nil )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
2022-01-19 23:02:24 -05:00
// PeerBucketQuotaConfigHandler - copies/deletes policy to local cluster.
func ( c * SiteReplicationSys ) PeerBucketQuotaConfigHandler ( ctx context . Context , bucket string , quota * madmin . BucketQuota ) error {
if quota != nil {
quotaData , err := json . Marshal ( quota )
if err != nil {
return wrapSRErr ( err )
}
if err = globalBucketMetadataSys . Update ( bucket , bucketQuotaConfigFile , quotaData ) ; err != nil {
return wrapSRErr ( err )
}
return nil
}
// Delete the bucket policy
err := globalBucketMetadataSys . Update ( bucket , bucketQuotaConfigFile , nil )
if err != nil {
return wrapSRErr ( err )
}
return nil
}
2021-10-06 19:36:31 -04:00
// getAdminClient - NOTE: ensure to take at least a read lock on SiteReplicationSys
// before calling this.
func ( c * SiteReplicationSys ) getAdminClient ( ctx context . Context , deploymentID string ) ( * madmin . AdminClient , error ) {
creds , err := c . getPeerCreds ( )
if err != nil {
return nil , err
}
peer , ok := c . state . Peers [ deploymentID ]
if ! ok {
return nil , errSRPeerNotFound
}
return getAdminClient ( peer . Endpoint , creds . AccessKey , creds . SecretKey )
}
2022-01-21 11:48:21 -05:00
// getAdminClientWithEndpoint - NOTE: ensure to take at least a read lock on SiteReplicationSys
// before calling this.
func ( c * SiteReplicationSys ) getAdminClientWithEndpoint ( ctx context . Context , deploymentID , endpoint string ) ( * madmin . AdminClient , error ) {
creds , err := c . getPeerCreds ( )
if err != nil {
return nil , err
}
if _ , ok := c . state . Peers [ deploymentID ] ; ! ok {
return nil , errSRPeerNotFound
}
return getAdminClient ( endpoint , creds . AccessKey , creds . SecretKey )
}
2021-10-06 19:36:31 -04:00
func ( c * SiteReplicationSys ) getPeerCreds ( ) ( * auth . Credentials , error ) {
2021-11-03 22:47:49 -04:00
creds , ok := globalIAMSys . store . GetUser ( c . state . ServiceAccountAccessKey )
2021-10-06 19:36:31 -04:00
if ! ok {
2021-12-14 17:09:57 -05:00
return nil , errors . New ( "site replication service account not found" )
2021-10-06 19:36:31 -04:00
}
return & creds , nil
}
2022-01-19 23:02:24 -05:00
// listBuckets returns a consistent common view of latest unique buckets across
// sites, this is used for replication.
func ( c * SiteReplicationSys ) listBuckets ( ctx context . Context ) ( [ ] BucketInfo , error ) {
2021-10-06 19:36:31 -04:00
// If local has buckets, enable versioning on them, create them on peers
// and setup replication rules.
objAPI := newObjectLayerFn ( )
if objAPI == nil {
2022-01-19 23:02:24 -05:00
return nil , errSRObjectLayerNotReady
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
return objAPI . ListBuckets ( ctx )
}
// syncToAllPeers is used for syncing local data to all remote peers, it is
// called once during initial "AddPeerClusters" request.
func ( c * SiteReplicationSys ) syncToAllPeers ( ctx context . Context ) error {
buckets , err := c . listBuckets ( ctx )
2021-10-06 19:36:31 -04:00
if err != nil {
2022-01-19 23:02:24 -05:00
return err
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
2021-10-06 19:36:31 -04:00
for _ , bucketInfo := range buckets {
bucket := bucketInfo . Name
// MinIO does not store bucket location - so we just check if
// object locking is enabled.
lockConfig , err := globalBucketMetadataSys . GetObjectLockConfig ( bucket )
if err != nil {
if _ , ok := err . ( BucketObjectLockConfigNotFound ) ; ! ok {
return errSRBackendIssue ( err )
}
}
var opts BucketOptions
if lockConfig != nil {
opts . LockEnabled = lockConfig . ObjectLockEnabled == "Enabled"
}
// Now call the MakeBucketHook on existing bucket - this will
// create buckets and replication rules on peer clusters.
err = c . MakeBucketHook ( ctx , bucket , opts )
if err != nil {
return errSRBucketConfigError ( err )
}
// Replicate bucket policy if present.
policy , err := globalPolicySys . Get ( bucket )
found := true
if _ , ok := err . ( BucketPolicyNotFound ) ; ok {
found = false
} else if err != nil {
return errSRBackendIssue ( err )
}
if found {
policyJSON , err := json . Marshal ( policy )
if err != nil {
return wrapSRErr ( err )
}
err = c . BucketMetaHook ( ctx , madmin . SRBucketMeta {
Type : madmin . SRBucketMetaTypePolicy ,
Bucket : bucket ,
Policy : policyJSON ,
} )
if err != nil {
return errSRBucketMetaError ( err )
}
}
// Replicate bucket tags if present.
tags , err := globalBucketMetadataSys . GetTaggingConfig ( bucket )
found = true
if _ , ok := err . ( BucketTaggingNotFound ) ; ok {
found = false
} else if err != nil {
return errSRBackendIssue ( err )
}
if found {
tagCfg , err := xml . Marshal ( tags )
if err != nil {
return wrapSRErr ( err )
}
tagCfgStr := base64 . StdEncoding . EncodeToString ( tagCfg )
err = c . BucketMetaHook ( ctx , madmin . SRBucketMeta {
Type : madmin . SRBucketMetaTypeTags ,
Bucket : bucket ,
Tags : & tagCfgStr ,
} )
if err != nil {
return errSRBucketMetaError ( err )
}
}
// Replicate object-lock config if present.
objLockCfg , err := globalBucketMetadataSys . GetObjectLockConfig ( bucket )
found = true
if _ , ok := err . ( BucketObjectLockConfigNotFound ) ; ok {
found = false
} else if err != nil {
return errSRBackendIssue ( err )
}
if found {
objLockCfgData , err := xml . Marshal ( objLockCfg )
if err != nil {
return wrapSRErr ( err )
}
objLockStr := base64 . StdEncoding . EncodeToString ( objLockCfgData )
err = c . BucketMetaHook ( ctx , madmin . SRBucketMeta {
Type : madmin . SRBucketMetaTypeObjectLockConfig ,
Bucket : bucket ,
Tags : & objLockStr ,
} )
if err != nil {
return errSRBucketMetaError ( err )
}
}
// Replicate existing bucket bucket encryption settings
sseConfig , err := globalBucketMetadataSys . GetSSEConfig ( bucket )
found = true
if _ , ok := err . ( BucketSSEConfigNotFound ) ; ok {
found = false
} else if err != nil {
return errSRBackendIssue ( err )
}
if found {
sseConfigData , err := xml . Marshal ( sseConfig )
if err != nil {
return wrapSRErr ( err )
}
sseConfigStr := base64 . StdEncoding . EncodeToString ( sseConfigData )
err = c . BucketMetaHook ( ctx , madmin . SRBucketMeta {
Type : madmin . SRBucketMetaTypeSSEConfig ,
Bucket : bucket ,
SSEConfig : & sseConfigStr ,
} )
if err != nil {
return errSRBucketMetaError ( err )
}
}
2022-01-19 23:02:24 -05:00
quotaConfig , err := globalBucketMetadataSys . GetQuotaConfig ( bucket )
found = true
if _ , ok := err . ( BucketQuotaConfigNotFound ) ; ok {
found = false
} else if err != nil {
return errSRBackendIssue ( err )
}
if found {
quotaConfigJSON , err := json . Marshal ( quotaConfig )
if err != nil {
return wrapSRErr ( err )
}
err = c . BucketMetaHook ( ctx , madmin . SRBucketMeta {
Type : madmin . SRBucketMetaTypeQuotaConfig ,
Bucket : bucket ,
Quota : quotaConfigJSON ,
} )
if err != nil {
return errSRBucketMetaError ( err )
}
}
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
// Order matters from now on how the information is
// synced to remote sites.
// Policies should be synced first.
2021-10-06 19:36:31 -04:00
{
// Replicate IAM policies on local to all peers.
2021-11-15 17:14:22 -05:00
allPolicies , err := globalIAMSys . ListPolicies ( ctx , "" )
2021-10-06 19:36:31 -04:00
if err != nil {
return errSRBackendIssue ( err )
}
for pname , policy := range allPolicies {
policyJSON , err := json . Marshal ( policy )
if err != nil {
return wrapSRErr ( err )
}
err = c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemPolicy ,
Name : pname ,
Policy : policyJSON ,
} )
if err != nil {
return errSRIAMError ( err )
}
}
}
2022-01-19 23:02:24 -05:00
// Next should be userAccounts those are local users, OIDC and LDAP will not
// may not have any local users.
2021-10-06 19:36:31 -04:00
{
2022-01-19 23:02:24 -05:00
userAccounts := make ( map [ string ] auth . Credentials )
2021-10-06 19:36:31 -04:00
globalIAMSys . store . rlock ( )
2022-01-19 23:02:24 -05:00
err := globalIAMSys . store . loadUsers ( ctx , regUser , userAccounts )
2021-10-06 19:36:31 -04:00
globalIAMSys . store . runlock ( )
2022-01-19 23:02:24 -05:00
if err != nil {
return errSRBackendIssue ( err )
2021-10-06 19:36:31 -04:00
}
2022-01-19 23:02:24 -05:00
for _ , acc := range userAccounts {
if err := c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemIAMUser ,
IAMUser : & madmin . SRIAMUser {
AccessKey : acc . AccessKey ,
IsDeleteReq : false ,
UserReq : & madmin . AddOrUpdateUserReq {
SecretKey : acc . SecretKey ,
Status : madmin . AccountStatus ( acc . Status ) ,
} ,
2021-10-06 19:36:31 -04:00
} ,
2022-01-19 23:02:24 -05:00
} ) ; err != nil {
2021-10-06 19:36:31 -04:00
return errSRIAMError ( err )
}
}
2022-01-19 23:02:24 -05:00
}
2021-10-06 19:36:31 -04:00
2022-01-19 23:02:24 -05:00
// Next should be Groups for some of these users, LDAP might have some Group
// DNs here
{
groups := make ( map [ string ] GroupInfo )
globalIAMSys . store . rlock ( )
err := globalIAMSys . store . loadGroups ( ctx , groups )
globalIAMSys . store . runlock ( )
if err != nil {
return errSRBackendIssue ( err )
}
for gname , group := range groups {
if err := c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemGroupInfo ,
GroupInfo : & madmin . SRGroupInfo {
UpdateReq : madmin . GroupAddRemove {
Group : gname ,
Members : group . Members ,
Status : madmin . GroupStatus ( group . Status ) ,
IsRemove : false ,
} ,
2021-10-06 19:36:31 -04:00
} ,
2022-01-19 23:02:24 -05:00
} ) ; err != nil {
2021-10-06 19:36:31 -04:00
return errSRIAMError ( err )
}
}
}
2022-01-19 23:02:24 -05:00
// Service accounts are the static accounts that should be synced with
// valid claims.
2021-10-06 19:36:31 -04:00
{
serviceAccounts := make ( map [ string ] auth . Credentials )
globalIAMSys . store . rlock ( )
err := globalIAMSys . store . loadUsers ( ctx , svcUser , serviceAccounts )
globalIAMSys . store . runlock ( )
if err != nil {
return errSRBackendIssue ( err )
}
2022-01-19 23:02:24 -05:00
2021-10-06 19:36:31 -04:00
for user , acc := range serviceAccounts {
2022-01-06 18:52:43 -05:00
if user == siteReplicatorSvcAcc {
// skip the site replicate svc account as it is
// already replicated.
continue
}
2022-01-19 23:02:24 -05:00
2021-10-06 19:36:31 -04:00
claims , err := globalIAMSys . GetClaimsForSvcAcc ( ctx , acc . AccessKey )
if err != nil {
return errSRBackendIssue ( err )
}
2022-01-19 23:02:24 -05:00
2021-10-06 19:36:31 -04:00
_ , policy , err := globalIAMSys . GetServiceAccount ( ctx , acc . AccessKey )
if err != nil {
return errSRBackendIssue ( err )
}
2022-01-19 23:02:24 -05:00
2021-10-06 19:36:31 -04:00
var policyJSON [ ] byte
if policy != nil {
policyJSON , err = json . Marshal ( policy )
if err != nil {
return wrapSRErr ( err )
}
}
2022-01-19 23:02:24 -05:00
2021-10-06 19:36:31 -04:00
err = c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemSvcAcc ,
SvcAccChange : & madmin . SRSvcAccChange {
Create : & madmin . SRSvcAccCreate {
Parent : acc . ParentUser ,
AccessKey : user ,
SecretKey : acc . SecretKey ,
Groups : acc . Groups ,
Claims : claims ,
SessionPolicy : json . RawMessage ( policyJSON ) ,
Status : acc . Status ,
} ,
} ,
} )
if err != nil {
return errSRIAMError ( err )
}
}
}
2022-01-19 23:02:24 -05:00
// Followed by policy mapping for the userAccounts we previously synced.
{
// Replicate policy mappings on local to all peers.
userPolicyMap := make ( map [ string ] MappedPolicy )
groupPolicyMap := make ( map [ string ] MappedPolicy )
globalIAMSys . store . rlock ( )
errU := globalIAMSys . store . loadMappedPolicies ( ctx , regUser , false , userPolicyMap )
errG := globalIAMSys . store . loadMappedPolicies ( ctx , regUser , true , groupPolicyMap )
globalIAMSys . store . runlock ( )
if errU != nil {
return errSRBackendIssue ( errU )
}
for user , mp := range userPolicyMap {
err := c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemPolicyMapping ,
PolicyMapping : & madmin . SRPolicyMapping {
UserOrGroup : user ,
IsGroup : false ,
Policy : mp . Policies ,
} ,
} )
if err != nil {
return errSRIAMError ( err )
}
}
if errG != nil {
return errSRBackendIssue ( errG )
}
for group , mp := range groupPolicyMap {
err := c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemPolicyMapping ,
PolicyMapping : & madmin . SRPolicyMapping {
UserOrGroup : group ,
IsGroup : true ,
Policy : mp . Policies ,
} ,
} )
if err != nil {
return errSRIAMError ( err )
}
}
}
// and finally followed by policy mappings for for STS users.
{
// Replicate policy mappings on local to all peers.
userPolicyMap := make ( map [ string ] MappedPolicy )
groupPolicyMap := make ( map [ string ] MappedPolicy )
globalIAMSys . store . rlock ( )
errU := globalIAMSys . store . loadMappedPolicies ( ctx , stsUser , false , userPolicyMap )
errG := globalIAMSys . store . loadMappedPolicies ( ctx , stsUser , true , groupPolicyMap )
globalIAMSys . store . runlock ( )
if errU != nil {
return errSRBackendIssue ( errU )
}
for user , mp := range userPolicyMap {
err := c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemPolicyMapping ,
PolicyMapping : & madmin . SRPolicyMapping {
UserOrGroup : user ,
IsGroup : false ,
Policy : mp . Policies ,
} ,
} )
if err != nil {
return errSRIAMError ( err )
}
}
if errG != nil {
return errSRBackendIssue ( errG )
}
for group , mp := range groupPolicyMap {
err := c . IAMChangeHook ( ctx , madmin . SRIAMItem {
Type : madmin . SRIAMItemPolicyMapping ,
PolicyMapping : & madmin . SRPolicyMapping {
UserOrGroup : group ,
IsGroup : true ,
Policy : mp . Policies ,
} ,
} )
if err != nil {
return errSRIAMError ( err )
}
}
}
2021-12-14 17:09:57 -05:00
return nil
2021-10-06 19:36:31 -04:00
}
// Concurrency helpers
type concErr struct {
numActions int
errMap map [ string ] error
summaryErr error
}
func ( c concErr ) Error ( ) string {
return c . summaryErr . Error ( )
}
func ( c concErr ) allFailed ( ) bool {
return len ( c . errMap ) == c . numActions
}
func ( c * SiteReplicationSys ) toErrorFromErrMap ( errMap map [ string ] error ) error {
if len ( errMap ) == 0 {
return nil
}
msgs := [ ] string { }
for d , err := range errMap {
name := c . state . Peers [ d ] . Name
msgs = append ( msgs , fmt . Sprintf ( "Site %s (%s): %v" , name , d , err ) )
}
return fmt . Errorf ( "Site replication error(s): %s" , strings . Join ( msgs , "; " ) )
}
func ( c * SiteReplicationSys ) newConcErr ( numActions int , errMap map [ string ] error ) concErr {
return concErr {
numActions : numActions ,
errMap : errMap ,
summaryErr : c . toErrorFromErrMap ( errMap ) ,
}
}
// concDo calls actions concurrently. selfActionFn is run for the current
// cluster and peerActionFn is run for each peer replication cluster.
func ( c * SiteReplicationSys ) concDo ( selfActionFn func ( ) error , peerActionFn func ( deploymentID string , p madmin . PeerInfo ) error ) concErr {
depIDs := make ( [ ] string , 0 , len ( c . state . Peers ) )
for d := range c . state . Peers {
depIDs = append ( depIDs , d )
}
errs := make ( [ ] error , len ( c . state . Peers ) )
var wg sync . WaitGroup
for i := range depIDs {
wg . Add ( 1 )
go func ( i int ) {
if depIDs [ i ] == globalDeploymentID {
if selfActionFn != nil {
errs [ i ] = selfActionFn ( )
}
} else {
errs [ i ] = peerActionFn ( depIDs [ i ] , c . state . Peers [ depIDs [ i ] ] )
}
wg . Done ( )
} ( i )
}
wg . Wait ( )
errMap := make ( map [ string ] error , len ( c . state . Peers ) )
for i , depID := range depIDs {
if errs [ i ] != nil {
errMap [ depID ] = errs [ i ]
}
}
numActions := len ( c . state . Peers ) - 1
if selfActionFn != nil {
numActions ++
}
return c . newConcErr ( numActions , errMap )
}
func ( c * SiteReplicationSys ) annotateErr ( annotation string , err error ) error {
if err == nil {
return nil
}
return fmt . Errorf ( "%s: %s: %v" , c . state . Name , annotation , err )
}
func ( c * SiteReplicationSys ) annotatePeerErr ( dstPeer string , annotation string , err error ) error {
if err == nil {
return nil
}
return fmt . Errorf ( "%s->%s: %s: %v" , c . state . Name , dstPeer , annotation , err )
}
2021-12-15 13:37:08 -05:00
// isEnabled returns true if site replication is enabled
func ( c * SiteReplicationSys ) isEnabled ( ) bool {
c . RLock ( )
defer c . RUnlock ( )
return c . enabled
}
2021-10-06 19:36:31 -04:00
// Other helpers
// newRemoteClusterHTTPTransport returns a new http configuration
// used while communicating with the remote cluster.
func newRemoteClusterHTTPTransport ( ) * http . Transport {
tr := & http . Transport {
Proxy : http . ProxyFromEnvironment ,
TLSClientConfig : & tls . Config {
2022-01-06 14:34:02 -05:00
RootCAs : globalRootCAs ,
ClientSessionCache : tls . NewLRUClientSessionCache ( tlsClientSessionCacheSize ) ,
2021-10-06 19:36:31 -04:00
} ,
}
return tr
}
func getAdminClient ( endpoint , accessKey , secretKey string ) ( * madmin . AdminClient , error ) {
epURL , _ := url . Parse ( endpoint )
client , err := madmin . New ( epURL . Host , accessKey , secretKey , epURL . Scheme == "https" )
if err != nil {
return nil , err
}
client . SetCustomTransport ( newRemoteClusterHTTPTransport ( ) )
return client , nil
}
func getS3Client ( pc madmin . PeerSite ) ( * minioClient . Client , error ) {
2022-01-19 23:02:24 -05:00
ep , err := url . Parse ( pc . Endpoint )
if err != nil {
return nil , err
}
2021-10-06 19:36:31 -04:00
return minioClient . New ( ep . Host , & minioClient . Options {
Creds : credentials . NewStaticV4 ( pc . AccessKey , pc . SecretKey , "" ) ,
Secure : ep . Scheme == "https" ,
Transport : newRemoteClusterHTTPTransport ( ) ,
} )
}
func getPriorityHelper ( replicationConfig replication . Config ) int {
maxPrio := 0
for _ , rule := range replicationConfig . Rules {
if rule . Priority > maxPrio {
maxPrio = rule . Priority
}
}
// leave some gaps in priority numbers for flexibility
return maxPrio + 10
}
2021-11-30 16:16:37 -05:00
// returns a slice with site names participating in site replciation but unspecified while adding
// a new site.
func getMissingSiteNames ( oldDeps , newDeps set . StringSet , currSites [ ] madmin . PeerInfo ) [ ] string {
diff := oldDeps . Difference ( newDeps )
var diffSlc [ ] string
for _ , v := range currSites {
if diff . Contains ( v . DeploymentID ) {
diffSlc = append ( diffSlc , v . Name )
}
}
return diffSlc
}
2022-01-05 05:44:08 -05:00
type srBucketMetaInfo struct {
madmin . SRBucketInfo
DeploymentID string
}
type srPolicy struct {
policy json . RawMessage
DeploymentID string
}
type srUserPolicyMapping struct {
madmin . SRPolicyMapping
DeploymentID string
}
type srGroupPolicyMapping struct {
madmin . SRPolicyMapping
DeploymentID string
}
// SiteReplicationStatus returns the site replication status across clusters participating in site replication.
func ( c * SiteReplicationSys ) SiteReplicationStatus ( ctx context . Context , objAPI ObjectLayer ) ( info madmin . SRStatusInfo , err error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return info , err
}
sris := make ( [ ] madmin . SRInfo , len ( c . state . Peers ) )
sriErrs := make ( [ ] error , len ( c . state . Peers ) )
g := errgroup . WithNErrs ( len ( c . state . Peers ) )
var depIDs [ ] string
for d := range c . state . Peers {
depIDs = append ( depIDs , d )
}
for index := range depIDs {
index := index
if depIDs [ index ] == globalDeploymentID {
g . Go ( func ( ) error {
sris [ index ] , sriErrs [ index ] = c . SiteReplicationMetaInfo ( ctx , objAPI )
return nil
} , index )
continue
}
g . Go ( func ( ) error {
admClient , err := c . getAdminClient ( ctx , depIDs [ index ] )
if err != nil {
return err
}
sris [ index ] , sriErrs [ index ] = admClient . SRMetaInfo ( ctx )
return nil
} , index )
}
// Wait for the go routines.
g . Wait ( )
for _ , serr := range sriErrs {
if serr != nil {
return info , errSRBackendIssue ( serr )
}
}
info . Enabled = true
info . Sites = make ( map [ string ] madmin . PeerInfo , len ( c . state . Peers ) )
for d , peer := range c . state . Peers {
info . Sites [ d ] = peer
}
var maxBuckets int
depIdxes := make ( map [ string ] int )
for i , sri := range sris {
depIdxes [ sri . DeploymentID ] = i
if len ( sri . Buckets ) > maxBuckets {
maxBuckets = len ( sri . Buckets )
}
}
// mapping b/w entity and entity config across sites
bucketStats := make ( map [ string ] [ ] srBucketMetaInfo )
policyStats := make ( map [ string ] [ ] srPolicy )
userPolicyStats := make ( map [ string ] [ ] srUserPolicyMapping )
groupPolicyStats := make ( map [ string ] [ ] srGroupPolicyMapping )
numSites := len ( sris )
for _ , sri := range sris {
for b , si := range sri . Buckets {
if _ , ok := bucketStats [ si . Bucket ] ; ! ok {
bucketStats [ b ] = make ( [ ] srBucketMetaInfo , 0 , numSites )
}
bucketStats [ b ] = append ( bucketStats [ b ] , srBucketMetaInfo { SRBucketInfo : si , DeploymentID : sri . DeploymentID } )
}
for pname , policy := range sri . Policies {
if _ , ok := policyStats [ pname ] ; ! ok {
policyStats [ pname ] = make ( [ ] srPolicy , 0 , numSites )
}
policyStats [ pname ] = append ( policyStats [ pname ] , srPolicy { policy : policy , DeploymentID : sri . DeploymentID } )
}
for user , policy := range sri . UserPolicies {
if _ , ok := userPolicyStats [ user ] ; ! ok {
userPolicyStats [ user ] = make ( [ ] srUserPolicyMapping , 0 , numSites )
}
userPolicyStats [ user ] = append ( userPolicyStats [ user ] , srUserPolicyMapping { SRPolicyMapping : policy , DeploymentID : sri . DeploymentID } )
}
for group , policy := range sri . GroupPolicies {
if _ , ok := userPolicyStats [ group ] ; ! ok {
groupPolicyStats [ group ] = make ( [ ] srGroupPolicyMapping , 0 , numSites )
}
groupPolicyStats [ group ] = append ( groupPolicyStats [ group ] , srGroupPolicyMapping { SRPolicyMapping : policy , DeploymentID : sri . DeploymentID } )
}
}
info . StatsSummary = make ( map [ string ] madmin . SRSiteSummary , len ( c . state . Peers ) )
info . BucketMismatches = make ( map [ string ] map [ string ] madmin . SRBucketStatsSummary )
info . PolicyMismatches = make ( map [ string ] map [ string ] madmin . SRPolicyStatsSummary )
info . UserMismatches = make ( map [ string ] map [ string ] madmin . SRUserStatsSummary )
info . GroupMismatches = make ( map [ string ] map [ string ] madmin . SRGroupStatsSummary )
// collect user policy mapping replication status across sites
for u , pslc := range userPolicyStats {
policySet := set . NewStringSet ( )
uPolicyCount := 0
for _ , ps := range pslc {
policyBytes , err := json . Marshal ( ps )
if err != nil {
continue
}
uPolicyCount ++
if policyStr := string ( policyBytes ) ; ! policySet . Contains ( policyStr ) {
policySet . Add ( policyStr )
}
}
policyMismatch := ! isReplicated ( uPolicyCount , numSites , policySet )
if policyMismatch {
for _ , ps := range pslc {
dID := depIdxes [ ps . DeploymentID ]
_ , hasUser := sris [ dID ] . UserPolicies [ u ]
info . UserMismatches [ u ] [ ps . DeploymentID ] = madmin . SRUserStatsSummary {
PolicyMismatch : policyMismatch ,
UserMissing : ! hasUser ,
}
}
}
}
// collect user policy mapping replication status across sites
for g , pslc := range groupPolicyStats {
policySet := set . NewStringSet ( )
gPolicyCount := 0
for _ , ps := range pslc {
policyBytes , err := json . Marshal ( ps )
if err != nil {
continue
}
gPolicyCount ++
if policyStr := string ( policyBytes ) ; ! policySet . Contains ( policyStr ) {
policySet . Add ( policyStr )
}
}
policyMismatch := ! isReplicated ( gPolicyCount , numSites , policySet )
if policyMismatch {
for _ , ps := range pslc {
dID := depIdxes [ ps . DeploymentID ]
_ , hasGroup := sris [ dID ] . GroupPolicies [ g ]
info . GroupMismatches [ g ] [ ps . DeploymentID ] = madmin . SRGroupStatsSummary {
PolicyMismatch : policyMismatch ,
GroupMissing : ! hasGroup ,
}
}
}
}
// collect IAM policy replication status across sites
for p , pslc := range policyStats {
var policies [ ] * iampolicy . Policy
uPolicyCount := 0
for _ , ps := range pslc {
plcy , err := iampolicy . ParseConfig ( bytes . NewReader ( ps . policy ) )
if err != nil {
continue
}
policies = append ( policies , plcy )
uPolicyCount ++
sum := info . StatsSummary [ ps . DeploymentID ]
sum . TotalIAMPoliciesCount ++
info . StatsSummary [ ps . DeploymentID ] = sum
}
policyMismatch := ! isIAMPolicyReplicated ( uPolicyCount , numSites , policies )
switch {
case policyMismatch :
for _ , ps := range pslc {
dID := depIdxes [ ps . DeploymentID ]
_ , hasPolicy := sris [ dID ] . Policies [ p ]
if len ( info . PolicyMismatches [ p ] ) == 0 {
info . PolicyMismatches [ p ] = make ( map [ string ] madmin . SRPolicyStatsSummary )
}
info . PolicyMismatches [ p ] [ ps . DeploymentID ] = madmin . SRPolicyStatsSummary {
PolicyMismatch : policyMismatch ,
PolicyMissing : ! hasPolicy ,
}
}
default :
// no mismatch
for _ , s := range pslc {
sum := info . StatsSummary [ s . DeploymentID ]
if ! policyMismatch {
sum . ReplicatedIAMPolicies ++
}
info . StatsSummary [ s . DeploymentID ] = sum
}
}
}
// collect bucket metadata replication stats across sites
for b , slc := range bucketStats {
tagSet := set . NewStringSet ( )
olockConfigSet := set . NewStringSet ( )
var policies [ ] * bktpolicy . Policy
var replCfgs [ ] * sreplication . Config
sseCfgSet := set . NewStringSet ( )
var tagCount , olockCfgCount , sseCfgCount int
for _ , s := range slc {
if s . ReplicationConfig != nil {
cfgBytes , err := base64 . StdEncoding . DecodeString ( * s . ReplicationConfig )
if err != nil {
continue
}
cfg , err := sreplication . ParseConfig ( bytes . NewReader ( cfgBytes ) )
if err != nil {
continue
}
replCfgs = append ( replCfgs , cfg )
}
if s . Tags != nil {
tagBytes , err := base64 . StdEncoding . DecodeString ( * s . Tags )
if err != nil {
continue
}
tagCount ++
if ! tagSet . Contains ( string ( tagBytes ) ) {
tagSet . Add ( string ( tagBytes ) )
}
}
if len ( s . Policy ) > 0 {
plcy , err := bktpolicy . ParseConfig ( bytes . NewReader ( s . Policy ) , b )
if err != nil {
continue
}
policies = append ( policies , plcy )
}
if s . ObjectLockConfig != nil {
olockCfgCount ++
if ! olockConfigSet . Contains ( * s . ObjectLockConfig ) {
olockConfigSet . Add ( * s . ObjectLockConfig )
}
}
if s . SSEConfig != nil {
if ! sseCfgSet . Contains ( * s . SSEConfig ) {
sseCfgSet . Add ( * s . SSEConfig )
}
sseCfgCount ++
}
ss , ok := info . StatsSummary [ s . DeploymentID ]
if ! ok {
ss = madmin . SRSiteSummary { }
}
// increment total number of replicated buckets
if len ( slc ) == numSites {
ss . ReplicatedBuckets ++
}
ss . TotalBucketsCount ++
if tagCount > 0 {
ss . TotalTagsCount ++
}
if olockCfgCount > 0 {
ss . TotalLockConfigCount ++
}
if sseCfgCount > 0 {
ss . TotalSSEConfigCount ++
}
if len ( policies ) > 0 {
ss . TotalBucketPoliciesCount ++
}
info . StatsSummary [ s . DeploymentID ] = ss
}
tagMismatch := ! isReplicated ( tagCount , numSites , tagSet )
olockCfgMismatch := ! isReplicated ( olockCfgCount , numSites , olockConfigSet )
sseCfgMismatch := ! isReplicated ( sseCfgCount , numSites , sseCfgSet )
policyMismatch := ! isBktPolicyReplicated ( numSites , policies )
replCfgMismatch := ! isBktReplCfgReplicated ( numSites , replCfgs )
switch {
case tagMismatch , olockCfgMismatch , sseCfgMismatch , policyMismatch , replCfgMismatch :
info . BucketMismatches [ b ] = make ( map [ string ] madmin . SRBucketStatsSummary , numSites )
for _ , s := range slc {
dID := depIdxes [ s . DeploymentID ]
_ , hasBucket := sris [ dID ] . Buckets [ s . Bucket ]
info . BucketMismatches [ b ] [ s . DeploymentID ] = madmin . SRBucketStatsSummary {
DeploymentID : s . DeploymentID ,
HasBucket : hasBucket ,
TagMismatch : tagMismatch ,
OLockConfigMismatch : olockCfgMismatch ,
SSEConfigMismatch : sseCfgMismatch ,
PolicyMismatch : policyMismatch ,
ReplicationCfgMismatch : replCfgMismatch ,
HasReplicationCfg : len ( replCfgs ) > 0 ,
}
}
fallthrough
default :
// no mismatch
for _ , s := range slc {
sum := info . StatsSummary [ s . DeploymentID ]
if ! olockCfgMismatch && olockCfgCount == numSites {
sum . ReplicatedLockConfig ++
}
if ! sseCfgMismatch && sseCfgCount == numSites {
sum . ReplicatedSSEConfig ++
}
if ! policyMismatch && len ( policies ) == numSites {
sum . ReplicatedBucketPolicies ++
}
if ! tagMismatch && tagCount == numSites {
sum . ReplicatedTags ++
}
info . StatsSummary [ s . DeploymentID ] = sum
}
}
}
// maximum buckets users etc seen across sites
info . MaxBuckets = len ( bucketStats )
info . MaxUsers = len ( userPolicyStats )
info . MaxGroups = len ( groupPolicyStats )
info . MaxPolicies = len ( policyStats )
return
}
// isReplicated returns true if count of replicated matches the number of
// sites and there is atmost one unique entry in the set.
func isReplicated ( cntReplicated , total int , valSet set . StringSet ) bool {
if cntReplicated > 0 && cntReplicated < total {
return false
}
if len ( valSet ) > 1 {
// mismatch - one or more sites has differing tags/policy
return false
}
return true
}
// isIAMPolicyReplicated returns true if count of replicated IAM policies matches total
// number of sites and IAM policies are identical.
func isIAMPolicyReplicated ( cntReplicated , total int , policies [ ] * iampolicy . Policy ) bool {
if cntReplicated > 0 && cntReplicated != total {
return false
}
// check if policies match between sites
var prev * iampolicy . Policy
for i , p := range policies {
if i == 0 {
prev = p
continue
}
if ! prev . Equals ( * p ) {
return false
}
}
return true
}
// isBktPolicyReplicated returns true if count of replicated bucket policies matches total
// number of sites and bucket policies are identical.
func isBktPolicyReplicated ( total int , policies [ ] * bktpolicy . Policy ) bool {
if len ( policies ) > 0 && len ( policies ) != total {
return false
}
// check if policies match between sites
var prev * bktpolicy . Policy
for i , p := range policies {
if i == 0 {
prev = p
continue
}
if ! prev . Equals ( * p ) {
return false
}
}
return true
}
// isBktReplCfgReplicated returns true if all the sites have same number
// of replication rules with all replication features enabled.
func isBktReplCfgReplicated ( total int , cfgs [ ] * sreplication . Config ) bool {
cntReplicated := len ( cfgs )
if cntReplicated > 0 && cntReplicated != len ( cfgs ) {
return false
}
// check if policies match between sites
var prev * sreplication . Config
for i , c := range cfgs {
if i == 0 {
prev = c
continue
}
if len ( prev . Rules ) != len ( c . Rules ) {
return false
}
if len ( c . Rules ) != total - 1 {
return false
}
for _ , r := range c . Rules {
if ! strings . HasPrefix ( r . ID , "site-repl-" ) {
return false
}
if r . DeleteMarkerReplication . Status == sreplication . Disabled ||
r . DeleteReplication . Status == sreplication . Disabled ||
r . ExistingObjectReplication . Status == sreplication . Disabled ||
r . SourceSelectionCriteria . ReplicaModifications . Status == sreplication . Disabled {
return false
}
}
}
return true
}
// SiteReplicationMetaInfo returns the metadata info on buckets, policies etc for the replicated site
func ( c * SiteReplicationSys ) SiteReplicationMetaInfo ( ctx context . Context , objAPI ObjectLayer ) ( info madmin . SRInfo , err error ) {
if objAPI == nil {
return info , errSRObjectLayerNotReady
}
c . RLock ( )
defer c . RUnlock ( )
if ! c . enabled {
return info , nil
}
buckets , err := objAPI . ListBuckets ( ctx )
if err != nil {
return info , errSRBackendIssue ( err )
}
info . DeploymentID = globalDeploymentID
info . Buckets = make ( map [ string ] madmin . SRBucketInfo , len ( buckets ) )
for _ , bucketInfo := range buckets {
bucket := bucketInfo . Name
bms := madmin . SRBucketInfo { Bucket : bucket }
// Get bucket policy if present.
policy , err := globalPolicySys . Get ( bucket )
found := true
if _ , ok := err . ( BucketPolicyNotFound ) ; ok {
found = false
} else if err != nil {
return info , errSRBackendIssue ( err )
}
if found {
policyJSON , err := json . Marshal ( policy )
if err != nil {
return info , wrapSRErr ( err )
}
bms . Policy = policyJSON
}
// Get bucket tags if present.
tags , err := globalBucketMetadataSys . GetTaggingConfig ( bucket )
found = true
if _ , ok := err . ( BucketTaggingNotFound ) ; ok {
found = false
} else if err != nil {
return info , errSRBackendIssue ( err )
}
if found {
tagBytes , err := xml . Marshal ( tags )
if err != nil {
return info , wrapSRErr ( err )
}
tagCfgStr := base64 . StdEncoding . EncodeToString ( tagBytes )
bms . Tags = & tagCfgStr
}
// Get object-lock config if present.
objLockCfg , err := globalBucketMetadataSys . GetObjectLockConfig ( bucket )
found = true
if _ , ok := err . ( BucketObjectLockConfigNotFound ) ; ok {
found = false
} else if err != nil {
return info , errSRBackendIssue ( err )
}
if found {
objLockCfgData , err := xml . Marshal ( objLockCfg )
if err != nil {
return info , wrapSRErr ( err )
}
objLockStr := base64 . StdEncoding . EncodeToString ( objLockCfgData )
bms . ObjectLockConfig = & objLockStr
}
// Get existing bucket bucket encryption settings
sseConfig , err := globalBucketMetadataSys . GetSSEConfig ( bucket )
found = true
if _ , ok := err . ( BucketSSEConfigNotFound ) ; ok {
found = false
} else if err != nil {
return info , errSRBackendIssue ( err )
}
if found {
sseConfigData , err := xml . Marshal ( sseConfig )
if err != nil {
return info , wrapSRErr ( err )
}
sseConfigStr := base64 . StdEncoding . EncodeToString ( sseConfigData )
bms . SSEConfig = & sseConfigStr
}
// Get replication config if present
rcfg , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket )
found = true
if _ , ok := err . ( BucketReplicationConfigNotFound ) ; ok {
found = false
} else if err != nil {
return info , errSRBackendIssue ( err )
}
if found {
rcfgXML , err := xml . Marshal ( rcfg )
if err != nil {
return info , wrapSRErr ( err )
}
rcfgXMLStr := base64 . StdEncoding . EncodeToString ( rcfgXML )
bms . ReplicationConfig = & rcfgXMLStr
}
info . Buckets [ bucket ] = bms
}
{
// Replicate IAM policies on local to all peers.
allPolicies , err := globalIAMSys . ListPolicies ( ctx , "" )
if err != nil {
return info , errSRBackendIssue ( err )
}
info . Policies = make ( map [ string ] json . RawMessage , len ( allPolicies ) )
for pname , policy := range allPolicies {
policyJSON , err := json . Marshal ( policy )
if err != nil {
return info , wrapSRErr ( err )
}
info . Policies [ pname ] = json . RawMessage ( policyJSON )
}
}
{
// Replicate policy mappings on local to all peers.
userPolicyMap := make ( map [ string ] MappedPolicy )
groupPolicyMap := make ( map [ string ] MappedPolicy )
globalIAMSys . store . rlock ( )
errU := globalIAMSys . store . loadMappedPolicies ( ctx , stsUser , false , userPolicyMap )
errG := globalIAMSys . store . loadMappedPolicies ( ctx , stsUser , true , groupPolicyMap )
globalIAMSys . store . runlock ( )
if errU != nil {
return info , errSRBackendIssue ( errU )
}
if errG != nil {
return info , errSRBackendIssue ( errG )
}
info . UserPolicies = make ( map [ string ] madmin . SRPolicyMapping , len ( userPolicyMap ) )
info . GroupPolicies = make ( map [ string ] madmin . SRPolicyMapping , len ( c . state . Peers ) )
for user , mp := range userPolicyMap {
info . UserPolicies [ user ] = madmin . SRPolicyMapping {
IsGroup : false ,
UserOrGroup : user ,
Policy : mp . Policies ,
}
}
for group , mp := range groupPolicyMap {
info . UserPolicies [ group ] = madmin . SRPolicyMapping {
IsGroup : true ,
UserOrGroup : group ,
Policy : mp . Policies ,
}
}
}
return info , nil
}
2022-01-21 11:48:21 -05:00
// EditPeerCluster - edits replication configuration and updates peer endpoint.
func ( c * SiteReplicationSys ) EditPeerCluster ( ctx context . Context , peer madmin . PeerInfo ) ( madmin . ReplicateEditStatus , error ) {
sites , err := c . GetClusterInfo ( ctx )
if err != nil {
return madmin . ReplicateEditStatus { } , errSRBackendIssue ( err )
}
if ! sites . Enabled {
return madmin . ReplicateEditStatus { } , errSRNotEnabled
}
var (
found bool
admClient * madmin . AdminClient
)
for _ , v := range sites . Sites {
if peer . DeploymentID == v . DeploymentID {
found = true
if peer . Endpoint == v . Endpoint {
return madmin . ReplicateEditStatus { } , errSRInvalidRequest ( fmt . Errorf ( "Endpoint %s entered for deployment id %s already configured in site replication" , v . Endpoint , v . DeploymentID ) )
}
admClient , err = c . getAdminClientWithEndpoint ( ctx , v . DeploymentID , peer . Endpoint )
if err != nil {
return madmin . ReplicateEditStatus { } , errSRPeerResp ( fmt . Errorf ( "unable to create admin client for %s: %w" , v . Name , err ) )
}
// check if endpoint is reachable
if _ , err = admClient . ServerInfo ( ctx ) ; err != nil {
return madmin . ReplicateEditStatus { } , errSRPeerResp ( fmt . Errorf ( "Endpoint %s not reachable: %w" , peer . Endpoint , err ) )
}
}
}
if ! found {
return madmin . ReplicateEditStatus { } , errSRInvalidRequest ( fmt . Errorf ( "%s not found in existing replicated sites" , peer . DeploymentID ) )
}
errs := make ( map [ string ] error , len ( c . state . Peers ) )
var wg sync . WaitGroup
pi := c . state . Peers [ peer . DeploymentID ]
pi . Endpoint = peer . Endpoint
for i , v := range sites . Sites {
if v . DeploymentID == globalDeploymentID {
c . state . Peers [ peer . DeploymentID ] = pi
continue
}
wg . Add ( 1 )
go func ( pi madmin . PeerInfo , i int ) {
defer wg . Done ( )
v := sites . Sites [ i ]
admClient , err := c . getAdminClient ( ctx , v . DeploymentID )
if v . DeploymentID == peer . DeploymentID {
admClient , err = c . getAdminClientWithEndpoint ( ctx , v . DeploymentID , peer . Endpoint )
}
if err != nil {
errs [ v . DeploymentID ] = errSRPeerResp ( fmt . Errorf ( "unable to create admin client for %s: %w" , v . Name , err ) )
return
}
if err = admClient . SRPeerEdit ( ctx , pi ) ; err != nil {
errs [ v . DeploymentID ] = errSRPeerResp ( fmt . Errorf ( "unable to update peer %s: %w" , v . Name , err ) )
return
}
} ( pi , i )
}
wg . Wait ( )
for dID , err := range errs {
if err != nil {
return madmin . ReplicateEditStatus { } , errSRPeerResp ( fmt . Errorf ( "unable to update peer %s: %w" , c . state . Peers [ dID ] . Name , err ) )
}
}
// we can now save the cluster replication configuration state.
if err = c . saveToDisk ( ctx , c . state ) ; err != nil {
return madmin . ReplicateEditStatus {
Status : madmin . ReplicateAddStatusPartial ,
ErrDetail : fmt . Sprintf ( "unable to save cluster-replication state on local: %v" , err ) ,
} , nil
}
result := madmin . ReplicateEditStatus {
Success : true ,
Status : fmt . Sprintf ( "Cluster replication configuration updated with endpoint %s for peer %s successfully" , peer . Endpoint , peer . Name ) ,
}
return result , nil
}
// PeerEditReq - internal API handler to respond to a peer cluster's request
// to edit endpoint.
func ( c * SiteReplicationSys ) PeerEditReq ( ctx context . Context , arg madmin . PeerInfo ) error {
ourName := ""
for i := range c . state . Peers {
p := c . state . Peers [ i ]
if p . DeploymentID == arg . DeploymentID {
p . Endpoint = arg . Endpoint
c . state . Peers [ arg . DeploymentID ] = p
}
if p . DeploymentID == globalDeploymentID {
ourName = p . Name
}
}
if err := c . saveToDisk ( ctx , c . state ) ; err != nil {
return errSRBackendIssue ( fmt . Errorf ( "unable to save cluster-replication state to disk on %s: %v" , ourName , err ) )
}
return nil
}