mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Add new site replication feature (#13311)
This change allows a set of MinIO sites (clusters) to be configured for mutual replication of all buckets (including bucket policies, tags, object-lock configuration and bucket encryption), IAM policies, LDAP service accounts and LDAP STS accounts.
This commit is contained in:
parent
cb2c2905c5
commit
3a7c79e2c7
315
cmd/admin-handlers-site-replication.go
Normal file
315
cmd/admin-handlers-site-replication.go
Normal file
@ -0,0 +1,315 @@
|
||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/madmin-go"
|
||||
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/bucket/policy"
|
||||
iampolicy "github.com/minio/pkg/iam/policy"
|
||||
)
|
||||
|
||||
// SiteReplicationAdd - PUT /minio/admin/v3/site-replication/add
|
||||
func (a adminAPIHandlers) SiteReplicationAdd(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SiteReplicationAdd")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, cred := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var sites []madmin.PeerSite
|
||||
errCode := readJSONBody(ctx, r.Body, &sites, cred.SecretKey)
|
||||
if errCode != ErrNone {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
status, errInfo := globalSiteReplicationSys.AddPeerClusters(ctx, sites)
|
||||
if errInfo.Code != ErrNone {
|
||||
logger.LogIf(ctx, errInfo)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
body, err := json.Marshal(status)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseJSON(w, body)
|
||||
}
|
||||
|
||||
// SRInternalJoin - PUT /minio/admin/v3/site-replication/join
|
||||
//
|
||||
// used internally to tell current cluster to enable SR with
|
||||
// the provided peer clusters and service account.
|
||||
func (a adminAPIHandlers) SRInternalJoin(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SRInternalJoin")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, cred := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var joinArg madmin.SRInternalJoinReq
|
||||
errCode := readJSONBody(ctx, r.Body, &joinArg, cred.SecretKey)
|
||||
if errCode != ErrNone {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
errInfo := globalSiteReplicationSys.InternalJoinReq(ctx, joinArg)
|
||||
if errInfo.Code != ErrNone {
|
||||
logger.LogIf(ctx, errInfo)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// SRInternalBucketOps - PUT /minio/admin/v3/site-replication/bucket-ops?bucket=x&operation=y
|
||||
func (a adminAPIHandlers) SRInternalBucketOps(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SRInternalBucketOps")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationOperationAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
operation := madmin.BktOp(vars["operation"])
|
||||
|
||||
var err error
|
||||
switch operation {
|
||||
case madmin.MakeWithVersioningBktOp:
|
||||
_, isLockEnabled := r.Form["lockEnabled"]
|
||||
_, isVersioningEnabled := r.Form["versioningEnabled"]
|
||||
opts := BucketOptions{
|
||||
Location: r.Form.Get("location"),
|
||||
LockEnabled: isLockEnabled,
|
||||
VersioningEnabled: isVersioningEnabled,
|
||||
}
|
||||
err = globalSiteReplicationSys.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)
|
||||
case madmin.ConfigureReplBktOp:
|
||||
err = globalSiteReplicationSys.PeerBucketConfigureReplHandler(ctx, bucket)
|
||||
case madmin.DeleteBucketBktOp:
|
||||
err = globalSiteReplicationSys.PeerBucketDeleteHandler(ctx, bucket, false)
|
||||
case madmin.ForceDeleteBucketBktOp:
|
||||
err = globalSiteReplicationSys.PeerBucketDeleteHandler(ctx, bucket, true)
|
||||
default:
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminInvalidArgument), r.URL)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// SRInternalReplicateIAMItem - PUT /minio/admin/v3/site-replication/iam-item
|
||||
func (a adminAPIHandlers) SRInternalReplicateIAMItem(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SRInternalReplicateIAMItem")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationOperationAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var item madmin.SRIAMItem
|
||||
errCode := readJSONBody(ctx, r.Body, &item, "")
|
||||
if errCode != ErrNone {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
switch item.Type {
|
||||
case madmin.SRIAMItemPolicy:
|
||||
var policy *iampolicy.Policy
|
||||
if len(item.Policy) > 0 {
|
||||
policy, err = iampolicy.ParseConfig(bytes.NewReader(item.Policy))
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
err = globalSiteReplicationSys.PeerAddPolicyHandler(ctx, item.Name, policy)
|
||||
case madmin.SRIAMItemSvcAcc:
|
||||
err = globalSiteReplicationSys.PeerSvcAccChangeHandler(ctx, *item.SvcAccChange)
|
||||
case madmin.SRIAMItemPolicyMapping:
|
||||
err = globalSiteReplicationSys.PeerPolicyMappingHandler(ctx, *item.PolicyMapping)
|
||||
case madmin.SRIAMItemSTSAcc:
|
||||
err = globalSiteReplicationSys.PeerSTSAccHandler(ctx, *item.STSCredential)
|
||||
|
||||
default:
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminInvalidArgument), r.URL)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// SRInternalReplicateBucketItem - PUT /minio/admin/v3/site-replication/bucket-meta
|
||||
func (a adminAPIHandlers) SRInternalReplicateBucketItem(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SRInternalReplicateIAMItem")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationOperationAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var item madmin.SRBucketMeta
|
||||
errCode := readJSONBody(ctx, r.Body, &item, "")
|
||||
if errCode != ErrNone {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
switch item.Type {
|
||||
case madmin.SRBucketMetaTypePolicy:
|
||||
var bktPolicy *policy.Policy
|
||||
if len(item.Policy) > 0 {
|
||||
bktPolicy, err = policy.ParseConfig(bytes.NewReader(item.Policy), item.Bucket)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
err = globalSiteReplicationSys.PeerBucketPolicyHandler(ctx, item.Bucket, bktPolicy)
|
||||
case madmin.SRBucketMetaTypeTags:
|
||||
err = globalSiteReplicationSys.PeerBucketTaggingHandler(ctx, item.Bucket, item.Tags)
|
||||
case madmin.SRBucketMetaTypeObjectLockConfig:
|
||||
err = globalSiteReplicationSys.PeerBucketObjectLockConfigHandler(ctx, item.Bucket, item.ObjectLockConfig)
|
||||
case madmin.SRBucketMetaTypeSSEConfig:
|
||||
err = globalSiteReplicationSys.PeerBucketSSEConfigHandler(ctx, item.Bucket, item.SSEConfig)
|
||||
|
||||
default:
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminInvalidArgument), r.URL)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// SiteReplicationDisable - PUT /minio/admin/v3/site-replication/disable
|
||||
func (a adminAPIHandlers) SiteReplicationDisable(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SiteReplicationDisable")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationDisableAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// SiteReplicationInfo - GET /minio/admin/v3/site-replication/info
|
||||
func (a adminAPIHandlers) SiteReplicationInfo(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SiteReplicationInfo")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationInfoAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
info, err := globalSiteReplicationSys.GetClusterInfo(ctx)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if err = json.NewEncoder(w).Encode(info); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
func (a adminAPIHandlers) SRInternalGetIDPSettings(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SiteReplicationGetIDPSettings")
|
||||
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction)
|
||||
if objectAPI == nil {
|
||||
return
|
||||
}
|
||||
|
||||
idpSettings := globalSiteReplicationSys.GetIDPSettings(ctx)
|
||||
if err := json.NewEncoder(w).Encode(idpSettings); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
func readJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptionKey string) APIErrorCode {
|
||||
data, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
return ErrInvalidRequest
|
||||
}
|
||||
|
||||
if encryptionKey != "" {
|
||||
data, err = madmin.DecryptData(encryptionKey, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ErrInvalidRequest
|
||||
}
|
||||
}
|
||||
|
||||
err = json.Unmarshal(data, v)
|
||||
if err != nil {
|
||||
return ErrAdminConfigBadJSON
|
||||
}
|
||||
|
||||
return ErrNone
|
||||
}
|
@ -28,6 +28,7 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/auth"
|
||||
"github.com/minio/minio/internal/config/dns"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
iampolicy "github.com/minio/pkg/iam/policy"
|
||||
@ -216,7 +217,6 @@ func (a adminAPIHandlers) UpdateGroupMembers(w http.ResponseWriter, r *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
defer r.Body.Close()
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
|
||||
@ -630,6 +630,34 @@ func (a adminAPIHandlers) AddServiceAccount(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
}
|
||||
|
||||
// Call hook for cluster-replication.
|
||||
//
|
||||
// FIXME: This wont work in an OpenID situation as the parent credential
|
||||
// may not be present on peer clusters to provide inherited policies.
|
||||
// Also, we should not be replicating root user's service account - as
|
||||
// they are not authenticated by a common external IDP, so we skip when
|
||||
// opts.ldapUser == "".
|
||||
if _, isLDAPAccount := opts.claims[ldapUserN]; isLDAPAccount {
|
||||
err = globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
|
||||
Type: madmin.SRIAMItemSvcAcc,
|
||||
SvcAccChange: &madmin.SRSvcAccChange{
|
||||
Create: &madmin.SRSvcAccCreate{
|
||||
Parent: newCred.ParentUser,
|
||||
AccessKey: newCred.AccessKey,
|
||||
SecretKey: newCred.SecretKey,
|
||||
Groups: newCred.Groups,
|
||||
Claims: opts.claims,
|
||||
SessionPolicy: createReq.Policy,
|
||||
Status: auth.AccountOn,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var createResp = madmin.AddServiceAccountResp{
|
||||
Credentials: madmin.Credentials{
|
||||
AccessKey: newCred.AccessKey,
|
||||
@ -741,6 +769,30 @@ func (a adminAPIHandlers) UpdateServiceAccount(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
}
|
||||
|
||||
// Call site replication hook. Only LDAP accounts are supported for
|
||||
// replication operations.
|
||||
svcAccClaims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, accessKey)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
if _, isLDAPAccount := svcAccClaims[ldapUserN]; isLDAPAccount {
|
||||
err = globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
|
||||
Type: madmin.SRIAMItemSvcAcc,
|
||||
SvcAccChange: &madmin.SRSvcAccChange{
|
||||
Update: &madmin.SRSvcAccUpdate{
|
||||
AccessKey: accessKey,
|
||||
SecretKey: opts.secretKey,
|
||||
Status: opts.status,
|
||||
SessionPolicy: updateReq.NewPolicy,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
|
||||
@ -975,6 +1027,28 @@ func (a adminAPIHandlers) DeleteServiceAccount(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
}
|
||||
|
||||
// Call site replication hook. Only LDAP accounts are supported for
|
||||
// replication operations.
|
||||
svcAccClaims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, serviceAccount)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
if _, isLDAPAccount := svcAccClaims[ldapUserN]; isLDAPAccount {
|
||||
err = globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
|
||||
Type: madmin.SRIAMItemSvcAcc,
|
||||
SvcAccChange: &madmin.SRSvcAccChange{
|
||||
Delete: &madmin.SRSvcAccDelete{
|
||||
AccessKey: serviceAccount,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
|
||||
@ -1286,6 +1360,16 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ
|
||||
logger.LogIf(ctx, nerr.Err)
|
||||
}
|
||||
}
|
||||
|
||||
// Call cluster-replication policy creation hook to replicate policy deletion to
|
||||
// other minio clusters.
|
||||
if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
|
||||
Type: madmin.SRIAMItemPolicy,
|
||||
Name: policyName,
|
||||
}); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// AddCannedPolicy - PUT /minio/admin/v3/add-canned-policy?name=<policy_name>
|
||||
@ -1314,7 +1398,13 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
iamPolicy, err := iampolicy.ParseConfig(io.LimitReader(r.Body, r.ContentLength))
|
||||
iamPolicyBytes, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength))
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
iamPolicy, err := iampolicy.ParseConfig(bytes.NewReader(iamPolicyBytes))
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
@ -1338,6 +1428,17 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request
|
||||
logger.LogIf(ctx, nerr.Err)
|
||||
}
|
||||
}
|
||||
|
||||
// Call cluster-replication policy creation hook to replicate policy to
|
||||
// other minio clusters.
|
||||
if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
|
||||
Type: madmin.SRIAMItemPolicy,
|
||||
Name: policyName,
|
||||
Policy: iamPolicyBytes,
|
||||
}); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// SetPolicyForUserOrGroup - PUT /minio/admin/v3/set-policy?policy=xxx&user-or-group=?[&is-group]
|
||||
@ -1380,4 +1481,16 @@ func (a adminAPIHandlers) SetPolicyForUserOrGroup(w http.ResponseWriter, r *http
|
||||
logger.LogIf(ctx, nerr.Err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
|
||||
Type: madmin.SRIAMItemPolicyMapping,
|
||||
PolicyMapping: &madmin.SRPolicyMapping{
|
||||
UserOrGroup: entityName,
|
||||
IsGroup: isGroup,
|
||||
Policy: policyName,
|
||||
},
|
||||
}); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -191,6 +191,16 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.AddTierHandler)))
|
||||
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.EditTierHandler)))
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.ListTierHandler)))
|
||||
|
||||
// Cluster Replication APIs
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/add").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationAdd)))
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/disable").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationDisable)))
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/info").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationInfo)))
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/join").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalJoin)))
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/peer/bucket-ops").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalBucketOps))).Queries("bucket", "{bucket:.*}").Queries("operation", "{operation:.*}")
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/iam-item").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalReplicateIAMItem)))
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/bucket-meta").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalReplicateBucketItem)))
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/peer/idp-settings").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalGetIDPSettings)))
|
||||
}
|
||||
|
||||
if globalIsDistErasure {
|
||||
|
@ -267,6 +267,16 @@ const (
|
||||
ErrAdminCredentialsMismatch
|
||||
ErrInsecureClientRequest
|
||||
ErrObjectTampered
|
||||
|
||||
// Site-Replication errors
|
||||
ErrSiteReplicationInvalidRequest
|
||||
ErrSiteReplicationPeerResp
|
||||
ErrSiteReplicationBackendIssue
|
||||
ErrSiteReplicationServiceAccountError
|
||||
ErrSiteReplicationBucketConfigError
|
||||
ErrSiteReplicationBucketMetaError
|
||||
ErrSiteReplicationIAMError
|
||||
|
||||
// Bucket Quota error codes
|
||||
ErrAdminBucketQuotaExceeded
|
||||
ErrAdminNoSuchQuotaConfiguration
|
||||
@ -1269,6 +1279,43 @@ var errorCodes = errorCodeMap{
|
||||
Description: errObjectTampered.Error(),
|
||||
HTTPStatusCode: http.StatusPartialContent,
|
||||
},
|
||||
|
||||
ErrSiteReplicationInvalidRequest: {
|
||||
Code: "XMinioSiteReplicationInvalidRequest",
|
||||
Description: "Invalid site-replication request",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrSiteReplicationPeerResp: {
|
||||
Code: "XMinioSiteReplicationPeerResp",
|
||||
Description: "Error received when contacting a peer site",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrSiteReplicationBackendIssue: {
|
||||
Code: "XMinioSiteReplicationBackendIssue",
|
||||
Description: "Error when requesting object layer backend",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrSiteReplicationServiceAccountError: {
|
||||
Code: "XMinioSiteReplicationServiceAccountError",
|
||||
Description: "Site replication related service account error",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrSiteReplicationBucketConfigError: {
|
||||
Code: "XMinioSiteReplicationBucketConfigError",
|
||||
Description: "Error while configuring replication on a bucket",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrSiteReplicationBucketMetaError: {
|
||||
Code: "XMinioSiteReplicationBucketMetaError",
|
||||
Description: "Error while replicating bucket metadata",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrSiteReplicationIAMError: {
|
||||
Code: "XMinioSiteReplicationIAMError",
|
||||
Description: "Error while replicating an IAM item",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
|
||||
ErrMaximumExpires: {
|
||||
Code: "AuthorizationQueryParametersError",
|
||||
Description: "X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds",
|
||||
|
File diff suppressed because one or more lines are too long
@ -18,12 +18,14 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/bucket/policy"
|
||||
)
|
||||
@ -95,6 +97,20 @@ func (api objectAPIHandlers) PutBucketEncryptionHandler(w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
|
||||
// Call site replication hook.
|
||||
//
|
||||
// We encode the xml bytes as base64 to ensure there are no encoding
|
||||
// errors.
|
||||
cfgStr := base64.StdEncoding.EncodeToString(configData)
|
||||
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypeSSEConfig,
|
||||
Bucket: bucket,
|
||||
SSEConfig: &cfgStr,
|
||||
}); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
sse "github.com/minio/minio/internal/bucket/encryption"
|
||||
@ -771,6 +772,17 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
||||
|
||||
// Proceed to creating a bucket.
|
||||
err := objectAPI.MakeBucketWithLocation(ctx, bucket, opts)
|
||||
if _, ok := err.(BucketExists); ok {
|
||||
// Though bucket exists locally, we send the site-replication
|
||||
// hook to ensure all sites have this bucket. If the hook
|
||||
// succeeds, the client will still receive a bucket exists
|
||||
// message.
|
||||
err2 := globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts)
|
||||
if err2 != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
@ -779,6 +791,13 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
||||
// Load updated bucket metadata into memory.
|
||||
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
|
||||
|
||||
// Call site replication hook
|
||||
err = globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure to add Location information here only for bucket
|
||||
if cp := pathClean(r.URL.Path); cp != "" {
|
||||
w.Header().Set(xhttp.Location, cp) // Clean any trailing slashes.
|
||||
@ -1259,6 +1278,12 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
|
||||
|
||||
globalNotificationSys.DeleteBucketMetadata(ctx, bucket)
|
||||
|
||||
// Call site replication hook.
|
||||
if err := globalSiteReplicationSys.DeleteBucketHook(ctx, bucket, forceDelete); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Write success response.
|
||||
writeSuccessNoContent(w)
|
||||
|
||||
@ -1324,6 +1349,20 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
|
||||
return
|
||||
}
|
||||
|
||||
// Call site replication hook.
|
||||
//
|
||||
// We encode the xml bytes as base64 to ensure there are no encoding
|
||||
// errors.
|
||||
cfgStr := base64.StdEncoding.EncodeToString(configData)
|
||||
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypeObjectLockConfig,
|
||||
Bucket: bucket,
|
||||
ObjectLockConfig: &cfgStr,
|
||||
}); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Write success response.
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
@ -1415,6 +1454,20 @@ func (api objectAPIHandlers) PutBucketTaggingHandler(w http.ResponseWriter, r *h
|
||||
return
|
||||
}
|
||||
|
||||
// Call site replication hook.
|
||||
//
|
||||
// We encode the xml bytes as base64 to ensure there are no encoding
|
||||
// errors.
|
||||
cfgStr := base64.StdEncoding.EncodeToString(configData)
|
||||
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypeTags,
|
||||
Bucket: bucket,
|
||||
Tags: &cfgStr,
|
||||
}); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Write success response.
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
@ -1483,6 +1536,14 @@ func (api objectAPIHandlers) DeleteBucketTaggingHandler(w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
|
||||
if err := globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypeTags,
|
||||
Bucket: bucket,
|
||||
}); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Write success response.
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
|
@ -18,12 +18,15 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/bucket/policy"
|
||||
)
|
||||
@ -76,7 +79,13 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
|
||||
return
|
||||
}
|
||||
|
||||
bucketPolicy, err := policy.ParseConfig(io.LimitReader(r.Body, r.ContentLength), bucket)
|
||||
bucketPolicyBytes, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength))
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
bucketPolicy, err := policy.ParseConfig(bytes.NewReader(bucketPolicyBytes), bucket)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
@ -99,6 +108,16 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
|
||||
return
|
||||
}
|
||||
|
||||
// Call site replication hook.
|
||||
if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypePolicy,
|
||||
Bucket: bucket,
|
||||
Policy: bucketPolicyBytes,
|
||||
}); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Success.
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
@ -134,6 +153,15 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
|
||||
// Call site replication hook.
|
||||
if err := globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{
|
||||
Type: madmin.SRBucketMetaTypePolicy,
|
||||
Bucket: bucket,
|
||||
}); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Success.
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
|
@ -256,6 +256,9 @@ var (
|
||||
// Allocated etcd endpoint for config and bucket DNS.
|
||||
globalEtcdClient *etcd.Client
|
||||
|
||||
// Cluster replication manager.
|
||||
globalSiteReplicationSys SiteReplicationSys
|
||||
|
||||
// Is set to true when Bucket federation is requested
|
||||
// and is 'true' when etcdConfig.PathPrefix is empty
|
||||
globalBucketFederation bool
|
||||
|
26
cmd/iam.go
26
cmd/iam.go
@ -514,6 +514,7 @@ func (sys *IAMSys) Load(ctx context.Context, store IAMStorageAPI) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// load service accounts
|
||||
if err := store.loadUsers(ctx, svcUser, iamUsersMap); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1427,6 +1428,31 @@ func (sys *IAMSys) GetServiceAccount(ctx context.Context, accessKey string) (aut
|
||||
return sa, embeddedPolicy, nil
|
||||
}
|
||||
|
||||
// GetClaimsForSvcAcc - gets the claims associated with the service account.
|
||||
func (sys *IAMSys) GetClaimsForSvcAcc(ctx context.Context, accessKey string) (map[string]interface{}, error) {
|
||||
if !sys.Initialized() {
|
||||
return nil, errServerNotInitialized
|
||||
}
|
||||
|
||||
if sys.usersSysType != LDAPUsersSysType {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
sys.store.rlock()
|
||||
defer sys.store.runlock()
|
||||
|
||||
sa, ok := sys.iamUsersMap[accessKey]
|
||||
if !ok || !sa.IsServiceAccount() {
|
||||
return nil, errNoSuchServiceAccount
|
||||
}
|
||||
|
||||
jwtClaims, err := auth.ExtractClaims(sa.SessionToken, globalActiveCred.SecretKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return jwtClaims.Map(), nil
|
||||
}
|
||||
|
||||
// DeleteServiceAccount - delete a service account
|
||||
func (sys *IAMSys) DeleteServiceAccount(ctx context.Context, accessKey string) error {
|
||||
if !sys.Initialized() {
|
||||
|
@ -1557,3 +1557,23 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// ReloadSiteReplicationConfig - tells all peer minio nodes to reload the
|
||||
// site-replication configuration.
|
||||
func (sys *NotificationSys) ReloadSiteReplicationConfig(ctx context.Context) []error {
|
||||
errs := make([]error, len(sys.allPeerClients))
|
||||
var wg sync.WaitGroup
|
||||
for index := range sys.peerClients {
|
||||
if sys.peerClients[index] == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
defer wg.Done()
|
||||
errs[index] = sys.peerClients[index].ReloadSiteReplicationConfig(ctx)
|
||||
}(index)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return errs
|
||||
}
|
||||
|
@ -1029,3 +1029,12 @@ func (client *peerRESTClient) Speedtest(ctx context.Context, size, concurrent in
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (client *peerRESTClient) ReloadSiteReplicationConfig(ctx context.Context) error {
|
||||
respBody, err := client.callWithContext(context.Background(), peerRESTMethodReloadSiteReplicationConfig, nil, nil, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
@ -25,47 +25,48 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
peerRESTMethodHealth = "/health"
|
||||
peerRESTMethodServerInfo = "/serverinfo"
|
||||
peerRESTMethodDriveInfo = "/driveinfo"
|
||||
peerRESTMethodNetInfo = "/netinfo"
|
||||
peerRESTMethodCPUInfo = "/cpuinfo"
|
||||
peerRESTMethodDiskHwInfo = "/diskhwinfo"
|
||||
peerRESTMethodOsInfo = "/osinfo"
|
||||
peerRESTMethodMemInfo = "/meminfo"
|
||||
peerRESTMethodProcInfo = "/procinfo"
|
||||
peerRESTMethodSysErrors = "/syserrors"
|
||||
peerRESTMethodSysServices = "/sysservices"
|
||||
peerRESTMethodSysConfig = "/sysconfig"
|
||||
peerRESTMethodDispatchNetInfo = "/dispatchnetinfo"
|
||||
peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata"
|
||||
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
|
||||
peerRESTMethodGetBucketStats = "/getbucketstats"
|
||||
peerRESTMethodServerUpdate = "/serverupdate"
|
||||
peerRESTMethodSignalService = "/signalservice"
|
||||
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
|
||||
peerRESTMethodGetLocks = "/getlocks"
|
||||
peerRESTMethodLoadUser = "/loaduser"
|
||||
peerRESTMethodLoadServiceAccount = "/loadserviceaccount"
|
||||
peerRESTMethodDeleteUser = "/deleteuser"
|
||||
peerRESTMethodDeleteServiceAccount = "/deleteserviceaccount"
|
||||
peerRESTMethodLoadPolicy = "/loadpolicy"
|
||||
peerRESTMethodLoadPolicyMapping = "/loadpolicymapping"
|
||||
peerRESTMethodDeletePolicy = "/deletepolicy"
|
||||
peerRESTMethodLoadGroup = "/loadgroup"
|
||||
peerRESTMethodStartProfiling = "/startprofiling"
|
||||
peerRESTMethodDownloadProfilingData = "/downloadprofilingdata"
|
||||
peerRESTMethodCycleBloom = "/cyclebloom"
|
||||
peerRESTMethodTrace = "/trace"
|
||||
peerRESTMethodListen = "/listen"
|
||||
peerRESTMethodLog = "/log"
|
||||
peerRESTMethodGetLocalDiskIDs = "/getlocaldiskids"
|
||||
peerRESTMethodGetBandwidth = "/bandwidth"
|
||||
peerRESTMethodGetMetacacheListing = "/getmetacache"
|
||||
peerRESTMethodUpdateMetacacheListing = "/updatemetacache"
|
||||
peerRESTMethodGetPeerMetrics = "/peermetrics"
|
||||
peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig"
|
||||
peerRESTMethodSpeedtest = "/speedtest"
|
||||
peerRESTMethodHealth = "/health"
|
||||
peerRESTMethodServerInfo = "/serverinfo"
|
||||
peerRESTMethodDriveInfo = "/driveinfo"
|
||||
peerRESTMethodNetInfo = "/netinfo"
|
||||
peerRESTMethodCPUInfo = "/cpuinfo"
|
||||
peerRESTMethodDiskHwInfo = "/diskhwinfo"
|
||||
peerRESTMethodOsInfo = "/osinfo"
|
||||
peerRESTMethodMemInfo = "/meminfo"
|
||||
peerRESTMethodProcInfo = "/procinfo"
|
||||
peerRESTMethodSysErrors = "/syserrors"
|
||||
peerRESTMethodSysServices = "/sysservices"
|
||||
peerRESTMethodSysConfig = "/sysconfig"
|
||||
peerRESTMethodDispatchNetInfo = "/dispatchnetinfo"
|
||||
peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata"
|
||||
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
|
||||
peerRESTMethodGetBucketStats = "/getbucketstats"
|
||||
peerRESTMethodServerUpdate = "/serverupdate"
|
||||
peerRESTMethodSignalService = "/signalservice"
|
||||
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
|
||||
peerRESTMethodGetLocks = "/getlocks"
|
||||
peerRESTMethodLoadUser = "/loaduser"
|
||||
peerRESTMethodLoadServiceAccount = "/loadserviceaccount"
|
||||
peerRESTMethodDeleteUser = "/deleteuser"
|
||||
peerRESTMethodDeleteServiceAccount = "/deleteserviceaccount"
|
||||
peerRESTMethodLoadPolicy = "/loadpolicy"
|
||||
peerRESTMethodLoadPolicyMapping = "/loadpolicymapping"
|
||||
peerRESTMethodDeletePolicy = "/deletepolicy"
|
||||
peerRESTMethodLoadGroup = "/loadgroup"
|
||||
peerRESTMethodStartProfiling = "/startprofiling"
|
||||
peerRESTMethodDownloadProfilingData = "/downloadprofilingdata"
|
||||
peerRESTMethodCycleBloom = "/cyclebloom"
|
||||
peerRESTMethodTrace = "/trace"
|
||||
peerRESTMethodListen = "/listen"
|
||||
peerRESTMethodLog = "/log"
|
||||
peerRESTMethodGetLocalDiskIDs = "/getlocaldiskids"
|
||||
peerRESTMethodGetBandwidth = "/bandwidth"
|
||||
peerRESTMethodGetMetacacheListing = "/getmetacache"
|
||||
peerRESTMethodUpdateMetacacheListing = "/updatemetacache"
|
||||
peerRESTMethodGetPeerMetrics = "/peermetrics"
|
||||
peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig"
|
||||
peerRESTMethodSpeedtest = "/speedtest"
|
||||
peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -514,7 +514,6 @@ func (s *peerRESTServer) GetMemInfoHandler(w http.ResponseWriter, r *http.Reques
|
||||
// (only the config that are of concern to minio)
|
||||
func (s *peerRESTServer) GetSysConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
fmt.Println("Invalid request")
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
}
|
||||
@ -532,7 +531,6 @@ func (s *peerRESTServer) GetSysConfigHandler(w http.ResponseWriter, r *http.Requ
|
||||
// (only the services that are of concern to minio)
|
||||
func (s *peerRESTServer) GetSysServicesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
fmt.Println("Invalid request")
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
}
|
||||
@ -584,6 +582,24 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *h
|
||||
}
|
||||
}
|
||||
|
||||
// ReloadSiteReplicationConfigHandler - reloads site replication configuration from the disks
|
||||
func (s *peerRESTServer) ReloadSiteReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
ctx := newContext(r, w, "LoadSiteReplication")
|
||||
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
logger.LogIf(r.Context(), globalSiteReplicationSys.Init(ctx, objAPI))
|
||||
}
|
||||
|
||||
// GetBucketStatsHandler - fetches current in-memory bucket stats, currently only
|
||||
// returns BucketReplicationStatus
|
||||
func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
@ -1385,4 +1401,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerMetrics).HandlerFunc(httpTraceHdrs(server.GetPeerMetrics))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
|
||||
}
|
||||
|
@ -414,6 +414,9 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
// Initialize bucket notification sub-system.
|
||||
globalNotificationSys.Init(ctx, buckets, newObject)
|
||||
|
||||
// Initialize site replication manager.
|
||||
globalSiteReplicationSys.Init(ctx, newObject)
|
||||
|
||||
if globalIsErasure {
|
||||
// Initialize transition tier configuration manager
|
||||
if err = globalTierConfigMgr.Init(ctx, newObject); err != nil {
|
||||
|
1652
cmd/site-replication.go
Normal file
1652
cmd/site-replication.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -29,6 +29,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/auth"
|
||||
"github.com/minio/minio/internal/config/identity/openid"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
@ -651,6 +652,19 @@ func (sts *stsAPIHandlers) AssumeRoleWithLDAPIdentity(w http.ResponseWriter, r *
|
||||
}
|
||||
}
|
||||
|
||||
// Call hook for cluster-replication.
|
||||
if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
|
||||
Type: madmin.SRIAMItemSTSAcc,
|
||||
STSCredential: &madmin.SRSTSCredential{
|
||||
AccessKey: cred.AccessKey,
|
||||
SecretKey: cred.SecretKey,
|
||||
SessionToken: cred.SessionToken,
|
||||
},
|
||||
}); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
ldapIdentityResponse := &AssumeRoleWithLDAPResponse{
|
||||
Result: LDAPIdentityResult{
|
||||
Credentials: cred,
|
||||
|
40
docs/site-replication/README.md
Normal file
40
docs/site-replication/README.md
Normal file
@ -0,0 +1,40 @@
|
||||
# Site Replication Guide #
|
||||
|
||||
This feature allows multiple independent MinIO sites (or clusters) that are using the same external IDentity Provider (IDP) to be configured as replicas. In this situation the set of replica sites are referred to as peer sites or just peers. This means that:
|
||||
|
||||
- when a bucket is created/deleted at a site, it is created/deleted on the other peer sites as well
|
||||
- each bucket is automatically configured with versioning enabled and to replicate its data on the corresponding bucket in each of the remaining peer sites
|
||||
- bucket policies, bucket tags, bucket object-lock configuration and bucket encryption settings are also replicated to all other peers
|
||||
- all IAM policies are replicated to all other peers
|
||||
- all service accounts belonging to users authenticated via the external IDP are replicated to all other peers
|
||||
|
||||
This feature is built on top of multi-site bucket replication feature.
|
||||
|
||||
## Configuring Site Replication ##
|
||||
|
||||
To configure site replication, ensure that all MinIO sites are using the same external IDP.
|
||||
|
||||
1. Configure an alias in `mc` for each of the sites. For example if you have three MinIO sites, you may run:
|
||||
|
||||
```shell
|
||||
$ mc alias set minio1 https://minio1.example.com:9000 minio1 minio1123
|
||||
$ mc alias set minio2 https://minio2.example.com:9000 minio2 minio2123
|
||||
$ mc alias set minio3 https://minio3.example.com:9000 minio3 minio3123
|
||||
```
|
||||
|
||||
NOTE: When configuring site replication, each site except the first one is required to be empty.
|
||||
|
||||
2. Add site replication configuration with:
|
||||
|
||||
```shell
|
||||
$ mc admin replicate add minio1 minio2 minio3
|
||||
```
|
||||
|
||||
3. Once the above command returns success, you may query site replication configuration with:
|
||||
|
||||
```shell
|
||||
$ mc admin replicate info minio1
|
||||
```
|
||||
|
||||
*NOTE*:
|
||||
Site replication enables bucket versioning automatically for each bucket: it must not be modified by the cluster operator.
|
4
go.mod
4
go.mod
@ -45,10 +45,10 @@ require (
|
||||
github.com/minio/csvparser v1.0.0
|
||||
github.com/minio/highwayhash v1.0.2
|
||||
github.com/minio/kes v0.14.0
|
||||
github.com/minio/madmin-go v1.1.6
|
||||
github.com/minio/madmin-go v1.1.7
|
||||
github.com/minio/minio-go/v7 v7.0.15-0.20210928020726-a58653d41dd8
|
||||
github.com/minio/parquet-go v1.0.0
|
||||
github.com/minio/pkg v1.1.3
|
||||
github.com/minio/pkg v1.1.4
|
||||
github.com/minio/selfupdate v0.3.1
|
||||
github.com/minio/sha256-simd v1.0.0
|
||||
github.com/minio/simdjson-go v0.2.1
|
||||
|
6
go.sum
6
go.sum
@ -1019,8 +1019,9 @@ github.com/minio/kes v0.11.0/go.mod h1:mTF1Bv8YVEtQqF/B7Felp4tLee44Pp+dgI0rhCvgN
|
||||
github.com/minio/kes v0.14.0 h1:plCGm4LwR++T1P1sXsJbyFRX54CE1WRuo9PAPj6MC3Q=
|
||||
github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQcfA=
|
||||
github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs=
|
||||
github.com/minio/madmin-go v1.1.6 h1:L53ALIbAilaEvuvMMT4XkJpd6mtaorkMBwCQ+zraYBA=
|
||||
github.com/minio/madmin-go v1.1.6/go.mod h1:vw+c3/u+DeVKqReEavo///Cl2OO8nt5s4ee843hJeLs=
|
||||
github.com/minio/madmin-go v1.1.7 h1:vZCnIfPlb40sBap+bmwvxG4/dSfkwF8QCRUHZL16Ylg=
|
||||
github.com/minio/madmin-go v1.1.7/go.mod h1:Iu0OnrMWNBYx1lqJTW+BFjBMx0Hi0wjw8VmqhiOs2Jo=
|
||||
github.com/minio/mc v0.0.0-20210626002108-cebf3318546f h1:hyFvo5hSFw2K417YvDr/vAKlgCG69uTuhZW/5LNdL0U=
|
||||
github.com/minio/mc v0.0.0-20210626002108-cebf3318546f/go.mod h1:tuaonkPjVApCXkbtKENHBtsqUf7YTV33qmFrC+Pgp5g=
|
||||
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
|
||||
@ -1041,8 +1042,9 @@ github.com/minio/parquet-go v1.0.0/go.mod h1:aQlkSOfOq2AtQKkuou3mosNVMwNokd+faTa
|
||||
github.com/minio/pkg v1.0.3/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8=
|
||||
github.com/minio/pkg v1.0.4/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8=
|
||||
github.com/minio/pkg v1.0.8/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14=
|
||||
github.com/minio/pkg v1.1.3 h1:J4vGnlNSxc/o9gDOQMZ3k0L3koA7ZgBQ7GRMrUpt/OY=
|
||||
github.com/minio/pkg v1.1.3/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14=
|
||||
github.com/minio/pkg v1.1.4 h1:VxdPYpXAi1xseavq2WAJLr9BaKn5OP0TC1/2R4Ad9Oc=
|
||||
github.com/minio/pkg v1.1.4/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14=
|
||||
github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs=
|
||||
github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM=
|
||||
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
|
||||
|
Loading…
Reference in New Issue
Block a user