From 3a7c79e2c71f7b09974ff1a1c70e4d149a83a6a9 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Wed, 6 Oct 2021 16:36:31 -0700 Subject: [PATCH] Add new site replication feature (#13311) This change allows a set of MinIO sites (clusters) to be configured for mutual replication of all buckets (including bucket policies, tags, object-lock configuration and bucket encryption), IAM policies, LDAP service accounts and LDAP STS accounts. --- cmd/admin-handlers-site-replication.go | 315 +++++ cmd/admin-handlers-users.go | 117 +- cmd/admin-router.go | 10 + cmd/api-errors.go | 47 + cmd/apierrorcode_string.go | 213 +-- cmd/bucket-encryption-handlers.go | 16 + cmd/bucket-handlers.go | 61 + cmd/bucket-policy-handlers.go | 30 +- cmd/globals.go | 3 + cmd/iam.go | 26 + cmd/notification.go | 20 + cmd/peer-rest-client.go | 9 + cmd/peer-rest-common.go | 83 +- cmd/peer-rest-server.go | 21 +- cmd/server-main.go | 3 + cmd/site-replication.go | 1652 ++++++++++++++++++++++++ cmd/sts-handlers.go | 14 + docs/site-replication/README.md | 40 + go.mod | 4 +- go.sum | 6 +- 20 files changed, 2537 insertions(+), 153 deletions(-) create mode 100644 cmd/admin-handlers-site-replication.go create mode 100644 cmd/site-replication.go create mode 100644 docs/site-replication/README.md diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go new file mode 100644 index 000000000..80cf66093 --- /dev/null +++ b/cmd/admin-handlers-site-replication.go @@ -0,0 +1,315 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bytes" + "context" + "encoding/json" + "io" + "io/ioutil" + "net/http" + + "github.com/gorilla/mux" + "github.com/minio/madmin-go" + + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/bucket/policy" + iampolicy "github.com/minio/pkg/iam/policy" +) + +// SiteReplicationAdd - PUT /minio/admin/v3/site-replication/add +func (a adminAPIHandlers) SiteReplicationAdd(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationAdd") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, cred := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction) + if objectAPI == nil { + return + } + + var sites []madmin.PeerSite + errCode := readJSONBody(ctx, r.Body, &sites, cred.SecretKey) + if errCode != ErrNone { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + return + } + + status, errInfo := globalSiteReplicationSys.AddPeerClusters(ctx, sites) + if errInfo.Code != ErrNone { + logger.LogIf(ctx, errInfo) + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL) + return + } + + body, err := json.Marshal(status) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + writeSuccessResponseJSON(w, body) +} + +// SRInternalJoin - PUT /minio/admin/v3/site-replication/join +// +// used internally to tell current cluster to enable SR with +// the provided peer clusters and service account. +func (a adminAPIHandlers) SRInternalJoin(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SRInternalJoin") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, cred := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction) + if objectAPI == nil { + return + } + + var joinArg madmin.SRInternalJoinReq + errCode := readJSONBody(ctx, r.Body, &joinArg, cred.SecretKey) + if errCode != ErrNone { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + return + } + + errInfo := globalSiteReplicationSys.InternalJoinReq(ctx, joinArg) + if errInfo.Code != ErrNone { + logger.LogIf(ctx, errInfo) + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL) + return + } +} + +// SRInternalBucketOps - PUT /minio/admin/v3/site-replication/bucket-ops?bucket=x&operation=y +func (a adminAPIHandlers) SRInternalBucketOps(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SRInternalBucketOps") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationOperationAction) + if objectAPI == nil { + return + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + operation := madmin.BktOp(vars["operation"]) + + var err error + switch operation { + case madmin.MakeWithVersioningBktOp: + _, isLockEnabled := r.Form["lockEnabled"] + _, isVersioningEnabled := r.Form["versioningEnabled"] + opts := BucketOptions{ + Location: r.Form.Get("location"), + LockEnabled: isLockEnabled, + VersioningEnabled: isVersioningEnabled, + } + err = globalSiteReplicationSys.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts) + case madmin.ConfigureReplBktOp: + err = globalSiteReplicationSys.PeerBucketConfigureReplHandler(ctx, bucket) + case madmin.DeleteBucketBktOp: + err = globalSiteReplicationSys.PeerBucketDeleteHandler(ctx, bucket, false) + case madmin.ForceDeleteBucketBktOp: + err = globalSiteReplicationSys.PeerBucketDeleteHandler(ctx, bucket, true) + default: + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminInvalidArgument), r.URL) + return + } + if err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + return + } + +} + +// SRInternalReplicateIAMItem - PUT /minio/admin/v3/site-replication/iam-item +func (a adminAPIHandlers) SRInternalReplicateIAMItem(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SRInternalReplicateIAMItem") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationOperationAction) + if objectAPI == nil { + return + } + + var item madmin.SRIAMItem + errCode := readJSONBody(ctx, r.Body, &item, "") + if errCode != ErrNone { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + return + } + + var err error + switch item.Type { + case madmin.SRIAMItemPolicy: + var policy *iampolicy.Policy + if len(item.Policy) > 0 { + policy, err = iampolicy.ParseConfig(bytes.NewReader(item.Policy)) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + } + err = globalSiteReplicationSys.PeerAddPolicyHandler(ctx, item.Name, policy) + case madmin.SRIAMItemSvcAcc: + err = globalSiteReplicationSys.PeerSvcAccChangeHandler(ctx, *item.SvcAccChange) + case madmin.SRIAMItemPolicyMapping: + err = globalSiteReplicationSys.PeerPolicyMappingHandler(ctx, *item.PolicyMapping) + case madmin.SRIAMItemSTSAcc: + err = globalSiteReplicationSys.PeerSTSAccHandler(ctx, *item.STSCredential) + + default: + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminInvalidArgument), r.URL) + return + } + if err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + return + } +} + +// SRInternalReplicateBucketItem - PUT /minio/admin/v3/site-replication/bucket-meta +func (a adminAPIHandlers) SRInternalReplicateBucketItem(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SRInternalReplicateIAMItem") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationOperationAction) + if objectAPI == nil { + return + } + + var item madmin.SRBucketMeta + errCode := readJSONBody(ctx, r.Body, &item, "") + if errCode != ErrNone { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + return + } + + var err error + switch item.Type { + case madmin.SRBucketMetaTypePolicy: + var bktPolicy *policy.Policy + if len(item.Policy) > 0 { + bktPolicy, err = policy.ParseConfig(bytes.NewReader(item.Policy), item.Bucket) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + } + err = globalSiteReplicationSys.PeerBucketPolicyHandler(ctx, item.Bucket, bktPolicy) + case madmin.SRBucketMetaTypeTags: + err = globalSiteReplicationSys.PeerBucketTaggingHandler(ctx, item.Bucket, item.Tags) + case madmin.SRBucketMetaTypeObjectLockConfig: + err = globalSiteReplicationSys.PeerBucketObjectLockConfigHandler(ctx, item.Bucket, item.ObjectLockConfig) + case madmin.SRBucketMetaTypeSSEConfig: + err = globalSiteReplicationSys.PeerBucketSSEConfigHandler(ctx, item.Bucket, item.SSEConfig) + + default: + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminInvalidArgument), r.URL) + return + } + if err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + return + } +} + +// SiteReplicationDisable - PUT /minio/admin/v3/site-replication/disable +func (a adminAPIHandlers) SiteReplicationDisable(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationDisable") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationDisableAction) + if objectAPI == nil { + return + } +} + +// SiteReplicationInfo - GET /minio/admin/v3/site-replication/info +func (a adminAPIHandlers) SiteReplicationInfo(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationInfo") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationInfoAction) + if objectAPI == nil { + return + } + + info, err := globalSiteReplicationSys.GetClusterInfo(ctx) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + if err = json.NewEncoder(w).Encode(info); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + w.(http.Flusher).Flush() +} + +func (a adminAPIHandlers) SRInternalGetIDPSettings(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationGetIDPSettings") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction) + if objectAPI == nil { + return + } + + idpSettings := globalSiteReplicationSys.GetIDPSettings(ctx) + if err := json.NewEncoder(w).Encode(idpSettings); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + w.(http.Flusher).Flush() +} + +func readJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptionKey string) APIErrorCode { + data, err := ioutil.ReadAll(body) + if err != nil { + return ErrInvalidRequest + } + + if encryptionKey != "" { + data, err = madmin.DecryptData(encryptionKey, bytes.NewReader(data)) + if err != nil { + logger.LogIf(ctx, err) + return ErrInvalidRequest + } + } + + err = json.Unmarshal(data, v) + if err != nil { + return ErrAdminConfigBadJSON + } + + return ErrNone +} diff --git a/cmd/admin-handlers-users.go b/cmd/admin-handlers-users.go index 34c24caee..866cecc4a 100644 --- a/cmd/admin-handlers-users.go +++ b/cmd/admin-handlers-users.go @@ -28,6 +28,7 @@ import ( "github.com/gorilla/mux" "github.com/minio/madmin-go" + "github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/logger" iampolicy "github.com/minio/pkg/iam/policy" @@ -216,7 +217,6 @@ func (a adminAPIHandlers) UpdateGroupMembers(w http.ResponseWriter, r *http.Requ return } - defer r.Body.Close() data, err := ioutil.ReadAll(r.Body) if err != nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL) @@ -630,6 +630,34 @@ func (a adminAPIHandlers) AddServiceAccount(w http.ResponseWriter, r *http.Reque } } + // Call hook for cluster-replication. + // + // FIXME: This wont work in an OpenID situation as the parent credential + // may not be present on peer clusters to provide inherited policies. + // Also, we should not be replicating root user's service account - as + // they are not authenticated by a common external IDP, so we skip when + // opts.ldapUser == "". + if _, isLDAPAccount := opts.claims[ldapUserN]; isLDAPAccount { + err = globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemSvcAcc, + SvcAccChange: &madmin.SRSvcAccChange{ + Create: &madmin.SRSvcAccCreate{ + Parent: newCred.ParentUser, + AccessKey: newCred.AccessKey, + SecretKey: newCred.SecretKey, + Groups: newCred.Groups, + Claims: opts.claims, + SessionPolicy: createReq.Policy, + Status: auth.AccountOn, + }, + }, + }) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + } + var createResp = madmin.AddServiceAccountResp{ Credentials: madmin.Credentials{ AccessKey: newCred.AccessKey, @@ -741,6 +769,30 @@ func (a adminAPIHandlers) UpdateServiceAccount(w http.ResponseWriter, r *http.Re } } + // Call site replication hook. Only LDAP accounts are supported for + // replication operations. + svcAccClaims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, accessKey) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + if _, isLDAPAccount := svcAccClaims[ldapUserN]; isLDAPAccount { + err = globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemSvcAcc, + SvcAccChange: &madmin.SRSvcAccChange{ + Update: &madmin.SRSvcAccUpdate{ + AccessKey: accessKey, + SecretKey: opts.secretKey, + Status: opts.status, + SessionPolicy: updateReq.NewPolicy, + }, + }, + }) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + } writeSuccessNoContent(w) } @@ -975,6 +1027,28 @@ func (a adminAPIHandlers) DeleteServiceAccount(w http.ResponseWriter, r *http.Re } } + // Call site replication hook. Only LDAP accounts are supported for + // replication operations. + svcAccClaims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, serviceAccount) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + if _, isLDAPAccount := svcAccClaims[ldapUserN]; isLDAPAccount { + err = globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemSvcAcc, + SvcAccChange: &madmin.SRSvcAccChange{ + Delete: &madmin.SRSvcAccDelete{ + AccessKey: serviceAccount, + }, + }, + }) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + } + writeSuccessNoContent(w) } @@ -1286,6 +1360,16 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ logger.LogIf(ctx, nerr.Err) } } + + // Call cluster-replication policy creation hook to replicate policy deletion to + // other minio clusters. + if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemPolicy, + Name: policyName, + }); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } } // AddCannedPolicy - PUT /minio/admin/v3/add-canned-policy?name= @@ -1314,7 +1398,13 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request return } - iamPolicy, err := iampolicy.ParseConfig(io.LimitReader(r.Body, r.ContentLength)) + iamPolicyBytes, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength)) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + iamPolicy, err := iampolicy.ParseConfig(bytes.NewReader(iamPolicyBytes)) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return @@ -1338,6 +1428,17 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request logger.LogIf(ctx, nerr.Err) } } + + // Call cluster-replication policy creation hook to replicate policy to + // other minio clusters. + if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemPolicy, + Name: policyName, + Policy: iamPolicyBytes, + }); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } } // SetPolicyForUserOrGroup - PUT /minio/admin/v3/set-policy?policy=xxx&user-or-group=?[&is-group] @@ -1380,4 +1481,16 @@ func (a adminAPIHandlers) SetPolicyForUserOrGroup(w http.ResponseWriter, r *http logger.LogIf(ctx, nerr.Err) } } + + if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemPolicyMapping, + PolicyMapping: &madmin.SRPolicyMapping{ + UserOrGroup: entityName, + IsGroup: isGroup, + Policy: policyName, + }, + }); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } } diff --git a/cmd/admin-router.go b/cmd/admin-router.go index bd41756d4..1df120dbf 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -191,6 +191,16 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPut).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.AddTierHandler))) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/tier/{tier}").HandlerFunc(gz(httpTraceHdrs(adminAPI.EditTierHandler))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/tier").HandlerFunc(gz(httpTraceHdrs(adminAPI.ListTierHandler))) + + // Cluster Replication APIs + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/add").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationAdd))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/disable").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationDisable))) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/info").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationInfo))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/join").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalJoin))) + adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/peer/bucket-ops").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalBucketOps))).Queries("bucket", "{bucket:.*}").Queries("operation", "{operation:.*}") + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/iam-item").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalReplicateIAMItem))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/bucket-meta").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalReplicateBucketItem))) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/peer/idp-settings").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRInternalGetIDPSettings))) } if globalIsDistErasure { diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 357b9307e..7bbf3bfb3 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -267,6 +267,16 @@ const ( ErrAdminCredentialsMismatch ErrInsecureClientRequest ErrObjectTampered + + // Site-Replication errors + ErrSiteReplicationInvalidRequest + ErrSiteReplicationPeerResp + ErrSiteReplicationBackendIssue + ErrSiteReplicationServiceAccountError + ErrSiteReplicationBucketConfigError + ErrSiteReplicationBucketMetaError + ErrSiteReplicationIAMError + // Bucket Quota error codes ErrAdminBucketQuotaExceeded ErrAdminNoSuchQuotaConfiguration @@ -1269,6 +1279,43 @@ var errorCodes = errorCodeMap{ Description: errObjectTampered.Error(), HTTPStatusCode: http.StatusPartialContent, }, + + ErrSiteReplicationInvalidRequest: { + Code: "XMinioSiteReplicationInvalidRequest", + Description: "Invalid site-replication request", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrSiteReplicationPeerResp: { + Code: "XMinioSiteReplicationPeerResp", + Description: "Error received when contacting a peer site", + HTTPStatusCode: http.StatusServiceUnavailable, + }, + ErrSiteReplicationBackendIssue: { + Code: "XMinioSiteReplicationBackendIssue", + Description: "Error when requesting object layer backend", + HTTPStatusCode: http.StatusServiceUnavailable, + }, + ErrSiteReplicationServiceAccountError: { + Code: "XMinioSiteReplicationServiceAccountError", + Description: "Site replication related service account error", + HTTPStatusCode: http.StatusServiceUnavailable, + }, + ErrSiteReplicationBucketConfigError: { + Code: "XMinioSiteReplicationBucketConfigError", + Description: "Error while configuring replication on a bucket", + HTTPStatusCode: http.StatusServiceUnavailable, + }, + ErrSiteReplicationBucketMetaError: { + Code: "XMinioSiteReplicationBucketMetaError", + Description: "Error while replicating bucket metadata", + HTTPStatusCode: http.StatusServiceUnavailable, + }, + ErrSiteReplicationIAMError: { + Code: "XMinioSiteReplicationIAMError", + Description: "Error while replicating an IAM item", + HTTPStatusCode: http.StatusServiceUnavailable, + }, + ErrMaximumExpires: { Code: "AuthorizationQueryParametersError", Description: "X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds", diff --git a/cmd/apierrorcode_string.go b/cmd/apierrorcode_string.go index 04043d3ca..98d61d18e 100644 --- a/cmd/apierrorcode_string.go +++ b/cmd/apierrorcode_string.go @@ -183,112 +183,119 @@ func _() { _ = x[ErrAdminCredentialsMismatch-172] _ = x[ErrInsecureClientRequest-173] _ = x[ErrObjectTampered-174] - _ = x[ErrAdminBucketQuotaExceeded-175] - _ = x[ErrAdminNoSuchQuotaConfiguration-176] - _ = x[ErrHealNotImplemented-177] - _ = x[ErrHealNoSuchProcess-178] - _ = x[ErrHealInvalidClientToken-179] - _ = x[ErrHealMissingBucket-180] - _ = x[ErrHealAlreadyRunning-181] - _ = x[ErrHealOverlappingPaths-182] - _ = x[ErrIncorrectContinuationToken-183] - _ = x[ErrEmptyRequestBody-184] - _ = x[ErrUnsupportedFunction-185] - _ = x[ErrInvalidExpressionType-186] - _ = x[ErrBusy-187] - _ = x[ErrUnauthorizedAccess-188] - _ = x[ErrExpressionTooLong-189] - _ = x[ErrIllegalSQLFunctionArgument-190] - _ = x[ErrInvalidKeyPath-191] - _ = x[ErrInvalidCompressionFormat-192] - _ = x[ErrInvalidFileHeaderInfo-193] - _ = x[ErrInvalidJSONType-194] - _ = x[ErrInvalidQuoteFields-195] - _ = x[ErrInvalidRequestParameter-196] - _ = x[ErrInvalidDataType-197] - _ = x[ErrInvalidTextEncoding-198] - _ = x[ErrInvalidDataSource-199] - _ = x[ErrInvalidTableAlias-200] - _ = x[ErrMissingRequiredParameter-201] - _ = x[ErrObjectSerializationConflict-202] - _ = x[ErrUnsupportedSQLOperation-203] - _ = x[ErrUnsupportedSQLStructure-204] - _ = x[ErrUnsupportedSyntax-205] - _ = x[ErrUnsupportedRangeHeader-206] - _ = x[ErrLexerInvalidChar-207] - _ = x[ErrLexerInvalidOperator-208] - _ = x[ErrLexerInvalidLiteral-209] - _ = x[ErrLexerInvalidIONLiteral-210] - _ = x[ErrParseExpectedDatePart-211] - _ = x[ErrParseExpectedKeyword-212] - _ = x[ErrParseExpectedTokenType-213] - _ = x[ErrParseExpected2TokenTypes-214] - _ = x[ErrParseExpectedNumber-215] - _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-216] - _ = x[ErrParseExpectedTypeName-217] - _ = x[ErrParseExpectedWhenClause-218] - _ = x[ErrParseUnsupportedToken-219] - _ = x[ErrParseUnsupportedLiteralsGroupBy-220] - _ = x[ErrParseExpectedMember-221] - _ = x[ErrParseUnsupportedSelect-222] - _ = x[ErrParseUnsupportedCase-223] - _ = x[ErrParseUnsupportedCaseClause-224] - _ = x[ErrParseUnsupportedAlias-225] - _ = x[ErrParseUnsupportedSyntax-226] - _ = x[ErrParseUnknownOperator-227] - _ = x[ErrParseMissingIdentAfterAt-228] - _ = x[ErrParseUnexpectedOperator-229] - _ = x[ErrParseUnexpectedTerm-230] - _ = x[ErrParseUnexpectedToken-231] - _ = x[ErrParseUnexpectedKeyword-232] - _ = x[ErrParseExpectedExpression-233] - _ = x[ErrParseExpectedLeftParenAfterCast-234] - _ = x[ErrParseExpectedLeftParenValueConstructor-235] - _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-236] - _ = x[ErrParseExpectedArgumentDelimiter-237] - _ = x[ErrParseCastArity-238] - _ = x[ErrParseInvalidTypeParam-239] - _ = x[ErrParseEmptySelect-240] - _ = x[ErrParseSelectMissingFrom-241] - _ = x[ErrParseExpectedIdentForGroupName-242] - _ = x[ErrParseExpectedIdentForAlias-243] - _ = x[ErrParseUnsupportedCallWithStar-244] - _ = x[ErrParseNonUnaryAgregateFunctionCall-245] - _ = x[ErrParseMalformedJoin-246] - _ = x[ErrParseExpectedIdentForAt-247] - _ = x[ErrParseAsteriskIsNotAloneInSelectList-248] - _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-249] - _ = x[ErrParseInvalidContextForWildcardInSelectList-250] - _ = x[ErrIncorrectSQLFunctionArgumentType-251] - _ = x[ErrValueParseFailure-252] - _ = x[ErrEvaluatorInvalidArguments-253] - _ = x[ErrIntegerOverflow-254] - _ = x[ErrLikeInvalidInputs-255] - _ = x[ErrCastFailed-256] - _ = x[ErrInvalidCast-257] - _ = x[ErrEvaluatorInvalidTimestampFormatPattern-258] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-259] - _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-260] - _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-261] - _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-262] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-263] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-264] - _ = x[ErrEvaluatorBindingDoesNotExist-265] - _ = x[ErrMissingHeaders-266] - _ = x[ErrInvalidColumnIndex-267] - _ = x[ErrAdminConfigNotificationTargetsFailed-268] - _ = x[ErrAdminProfilerNotEnabled-269] - _ = x[ErrInvalidDecompressedSize-270] - _ = x[ErrAddUserInvalidArgument-271] - _ = x[ErrAdminAccountNotEligible-272] - _ = x[ErrAccountNotEligible-273] - _ = x[ErrAdminServiceAccountNotFound-274] - _ = x[ErrPostPolicyConditionInvalidFormat-275] + _ = x[ErrSiteReplicationInvalidRequest-175] + _ = x[ErrSiteReplicationPeerResp-176] + _ = x[ErrSiteReplicationBackendIssue-177] + _ = x[ErrSiteReplicationServiceAccountError-178] + _ = x[ErrSiteReplicationBucketConfigError-179] + _ = x[ErrSiteReplicationBucketMetaError-180] + _ = x[ErrSiteReplicationIAMError-181] + _ = x[ErrAdminBucketQuotaExceeded-182] + _ = x[ErrAdminNoSuchQuotaConfiguration-183] + _ = x[ErrHealNotImplemented-184] + _ = x[ErrHealNoSuchProcess-185] + _ = x[ErrHealInvalidClientToken-186] + _ = x[ErrHealMissingBucket-187] + _ = x[ErrHealAlreadyRunning-188] + _ = x[ErrHealOverlappingPaths-189] + _ = x[ErrIncorrectContinuationToken-190] + _ = x[ErrEmptyRequestBody-191] + _ = x[ErrUnsupportedFunction-192] + _ = x[ErrInvalidExpressionType-193] + _ = x[ErrBusy-194] + _ = x[ErrUnauthorizedAccess-195] + _ = x[ErrExpressionTooLong-196] + _ = x[ErrIllegalSQLFunctionArgument-197] + _ = x[ErrInvalidKeyPath-198] + _ = x[ErrInvalidCompressionFormat-199] + _ = x[ErrInvalidFileHeaderInfo-200] + _ = x[ErrInvalidJSONType-201] + _ = x[ErrInvalidQuoteFields-202] + _ = x[ErrInvalidRequestParameter-203] + _ = x[ErrInvalidDataType-204] + _ = x[ErrInvalidTextEncoding-205] + _ = x[ErrInvalidDataSource-206] + _ = x[ErrInvalidTableAlias-207] + _ = x[ErrMissingRequiredParameter-208] + _ = x[ErrObjectSerializationConflict-209] + _ = x[ErrUnsupportedSQLOperation-210] + _ = x[ErrUnsupportedSQLStructure-211] + _ = x[ErrUnsupportedSyntax-212] + _ = x[ErrUnsupportedRangeHeader-213] + _ = x[ErrLexerInvalidChar-214] + _ = x[ErrLexerInvalidOperator-215] + _ = x[ErrLexerInvalidLiteral-216] + _ = x[ErrLexerInvalidIONLiteral-217] + _ = x[ErrParseExpectedDatePart-218] + _ = x[ErrParseExpectedKeyword-219] + _ = x[ErrParseExpectedTokenType-220] + _ = x[ErrParseExpected2TokenTypes-221] + _ = x[ErrParseExpectedNumber-222] + _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-223] + _ = x[ErrParseExpectedTypeName-224] + _ = x[ErrParseExpectedWhenClause-225] + _ = x[ErrParseUnsupportedToken-226] + _ = x[ErrParseUnsupportedLiteralsGroupBy-227] + _ = x[ErrParseExpectedMember-228] + _ = x[ErrParseUnsupportedSelect-229] + _ = x[ErrParseUnsupportedCase-230] + _ = x[ErrParseUnsupportedCaseClause-231] + _ = x[ErrParseUnsupportedAlias-232] + _ = x[ErrParseUnsupportedSyntax-233] + _ = x[ErrParseUnknownOperator-234] + _ = x[ErrParseMissingIdentAfterAt-235] + _ = x[ErrParseUnexpectedOperator-236] + _ = x[ErrParseUnexpectedTerm-237] + _ = x[ErrParseUnexpectedToken-238] + _ = x[ErrParseUnexpectedKeyword-239] + _ = x[ErrParseExpectedExpression-240] + _ = x[ErrParseExpectedLeftParenAfterCast-241] + _ = x[ErrParseExpectedLeftParenValueConstructor-242] + _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-243] + _ = x[ErrParseExpectedArgumentDelimiter-244] + _ = x[ErrParseCastArity-245] + _ = x[ErrParseInvalidTypeParam-246] + _ = x[ErrParseEmptySelect-247] + _ = x[ErrParseSelectMissingFrom-248] + _ = x[ErrParseExpectedIdentForGroupName-249] + _ = x[ErrParseExpectedIdentForAlias-250] + _ = x[ErrParseUnsupportedCallWithStar-251] + _ = x[ErrParseNonUnaryAgregateFunctionCall-252] + _ = x[ErrParseMalformedJoin-253] + _ = x[ErrParseExpectedIdentForAt-254] + _ = x[ErrParseAsteriskIsNotAloneInSelectList-255] + _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-256] + _ = x[ErrParseInvalidContextForWildcardInSelectList-257] + _ = x[ErrIncorrectSQLFunctionArgumentType-258] + _ = x[ErrValueParseFailure-259] + _ = x[ErrEvaluatorInvalidArguments-260] + _ = x[ErrIntegerOverflow-261] + _ = x[ErrLikeInvalidInputs-262] + _ = x[ErrCastFailed-263] + _ = x[ErrInvalidCast-264] + _ = x[ErrEvaluatorInvalidTimestampFormatPattern-265] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-266] + _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-267] + _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-268] + _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-269] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-270] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-271] + _ = x[ErrEvaluatorBindingDoesNotExist-272] + _ = x[ErrMissingHeaders-273] + _ = x[ErrInvalidColumnIndex-274] + _ = x[ErrAdminConfigNotificationTargetsFailed-275] + _ = x[ErrAdminProfilerNotEnabled-276] + _ = x[ErrInvalidDecompressedSize-277] + _ = x[ErrAddUserInvalidArgument-278] + _ = x[ErrAdminAccountNotEligible-279] + _ = x[ErrAccountNotEligible-280] + _ = x[ErrAdminServiceAccountNotFound-281] + _ = x[ErrPostPolicyConditionInvalidFormat-282] } -const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorReplicationBandwidthLimitErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorReplicationNoMatchingRuleErrorObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchReadQuorumWriteQuorumStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminConfigDuplicateKeysAdminCredentialsMismatchInsecureClientRequestObjectTamperedAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormat" +const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorReplicationBandwidthLimitErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorReplicationNoMatchingRuleErrorObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchReadQuorumWriteQuorumStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminConfigDuplicateKeysAdminCredentialsMismatchInsecureClientRequestObjectTamperedSiteReplicationInvalidRequestSiteReplicationPeerRespSiteReplicationBackendIssueSiteReplicationServiceAccountErrorSiteReplicationBucketConfigErrorSiteReplicationBucketMetaErrorSiteReplicationIAMErrorAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormat" -var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 142, 154, 176, 196, 222, 236, 257, 274, 289, 312, 329, 347, 364, 388, 403, 424, 442, 454, 474, 491, 514, 535, 547, 565, 586, 614, 635, 658, 684, 721, 751, 784, 809, 841, 871, 900, 925, 947, 973, 995, 1023, 1052, 1086, 1117, 1154, 1184, 1214, 1223, 1235, 1251, 1264, 1278, 1296, 1316, 1337, 1353, 1364, 1380, 1408, 1428, 1444, 1472, 1486, 1503, 1518, 1531, 1545, 1558, 1571, 1587, 1604, 1625, 1639, 1660, 1673, 1695, 1718, 1743, 1759, 1774, 1789, 1810, 1828, 1843, 1860, 1885, 1903, 1926, 1941, 1960, 1976, 1995, 2009, 2017, 2036, 2046, 2061, 2097, 2128, 2161, 2190, 2202, 2222, 2246, 2270, 2291, 2315, 2334, 2357, 2383, 2404, 2422, 2449, 2476, 2497, 2518, 2542, 2567, 2595, 2623, 2639, 2650, 2662, 2679, 2694, 2712, 2741, 2758, 2774, 2790, 2808, 2826, 2849, 2870, 2880, 2891, 2902, 2918, 2941, 2958, 2986, 3005, 3025, 3042, 3060, 3077, 3091, 3126, 3145, 3156, 3169, 3184, 3200, 3218, 3235, 3255, 3276, 3297, 3316, 3335, 3353, 3377, 3401, 3422, 3436, 3460, 3489, 3507, 3524, 3546, 3563, 3581, 3601, 3627, 3643, 3662, 3683, 3687, 3705, 3722, 3748, 3762, 3786, 3807, 3822, 3840, 3863, 3878, 3897, 3914, 3931, 3955, 3982, 4005, 4028, 4045, 4067, 4083, 4103, 4122, 4144, 4165, 4185, 4207, 4231, 4250, 4292, 4313, 4336, 4357, 4388, 4407, 4429, 4449, 4475, 4496, 4518, 4538, 4562, 4585, 4604, 4624, 4646, 4669, 4700, 4738, 4779, 4809, 4823, 4844, 4860, 4882, 4912, 4938, 4966, 4999, 5017, 5040, 5075, 5115, 5157, 5189, 5206, 5231, 5246, 5263, 5273, 5284, 5322, 5376, 5422, 5474, 5522, 5565, 5609, 5637, 5651, 5669, 5705, 5728, 5751, 5773, 5796, 5814, 5841, 5873} +var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 142, 154, 176, 196, 222, 236, 257, 274, 289, 312, 329, 347, 364, 388, 403, 424, 442, 454, 474, 491, 514, 535, 547, 565, 586, 614, 635, 658, 684, 721, 751, 784, 809, 841, 871, 900, 925, 947, 973, 995, 1023, 1052, 1086, 1117, 1154, 1184, 1214, 1223, 1235, 1251, 1264, 1278, 1296, 1316, 1337, 1353, 1364, 1380, 1408, 1428, 1444, 1472, 1486, 1503, 1518, 1531, 1545, 1558, 1571, 1587, 1604, 1625, 1639, 1660, 1673, 1695, 1718, 1743, 1759, 1774, 1789, 1810, 1828, 1843, 1860, 1885, 1903, 1926, 1941, 1960, 1976, 1995, 2009, 2017, 2036, 2046, 2061, 2097, 2128, 2161, 2190, 2202, 2222, 2246, 2270, 2291, 2315, 2334, 2357, 2383, 2404, 2422, 2449, 2476, 2497, 2518, 2542, 2567, 2595, 2623, 2639, 2650, 2662, 2679, 2694, 2712, 2741, 2758, 2774, 2790, 2808, 2826, 2849, 2870, 2880, 2891, 2902, 2918, 2941, 2958, 2986, 3005, 3025, 3042, 3060, 3077, 3091, 3126, 3145, 3156, 3169, 3184, 3200, 3218, 3235, 3255, 3276, 3297, 3316, 3335, 3353, 3377, 3401, 3422, 3436, 3465, 3488, 3515, 3549, 3581, 3611, 3634, 3658, 3687, 3705, 3722, 3744, 3761, 3779, 3799, 3825, 3841, 3860, 3881, 3885, 3903, 3920, 3946, 3960, 3984, 4005, 4020, 4038, 4061, 4076, 4095, 4112, 4129, 4153, 4180, 4203, 4226, 4243, 4265, 4281, 4301, 4320, 4342, 4363, 4383, 4405, 4429, 4448, 4490, 4511, 4534, 4555, 4586, 4605, 4627, 4647, 4673, 4694, 4716, 4736, 4760, 4783, 4802, 4822, 4844, 4867, 4898, 4936, 4977, 5007, 5021, 5042, 5058, 5080, 5110, 5136, 5164, 5197, 5215, 5238, 5273, 5313, 5355, 5387, 5404, 5429, 5444, 5461, 5471, 5482, 5520, 5574, 5620, 5672, 5720, 5763, 5807, 5835, 5849, 5867, 5903, 5926, 5949, 5971, 5994, 6012, 6039, 6071} func (i APIErrorCode) String() string { if i < 0 || i >= APIErrorCode(len(_APIErrorCode_index)-1) { diff --git a/cmd/bucket-encryption-handlers.go b/cmd/bucket-encryption-handlers.go index 00bff63dc..74e5f5a7f 100644 --- a/cmd/bucket-encryption-handlers.go +++ b/cmd/bucket-encryption-handlers.go @@ -18,12 +18,14 @@ package cmd import ( + "encoding/base64" "encoding/xml" "fmt" "io" "net/http" "github.com/gorilla/mux" + "github.com/minio/madmin-go" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/bucket/policy" ) @@ -95,6 +97,20 @@ func (api objectAPIHandlers) PutBucketEncryptionHandler(w http.ResponseWriter, r return } + // Call site replication hook. + // + // We encode the xml bytes as base64 to ensure there are no encoding + // errors. + cfgStr := base64.StdEncoding.EncodeToString(configData) + if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeSSEConfig, + Bucket: bucket, + SSEConfig: &cfgStr, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + writeSuccessResponseHeadersOnly(w) } diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index eba688e69..8d2ad96fe 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -38,6 +38,7 @@ import ( "github.com/google/uuid" "github.com/gorilla/mux" + "github.com/minio/madmin-go" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" sse "github.com/minio/minio/internal/bucket/encryption" @@ -771,6 +772,17 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req // Proceed to creating a bucket. err := objectAPI.MakeBucketWithLocation(ctx, bucket, opts) + if _, ok := err.(BucketExists); ok { + // Though bucket exists locally, we send the site-replication + // hook to ensure all sites have this bucket. If the hook + // succeeds, the client will still receive a bucket exists + // message. + err2 := globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts) + if err2 != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -779,6 +791,13 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req // Load updated bucket metadata into memory. globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) + // Call site replication hook + err = globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Make sure to add Location information here only for bucket if cp := pathClean(r.URL.Path); cp != "" { w.Header().Set(xhttp.Location, cp) // Clean any trailing slashes. @@ -1259,6 +1278,12 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. globalNotificationSys.DeleteBucketMetadata(ctx, bucket) + // Call site replication hook. + if err := globalSiteReplicationSys.DeleteBucketHook(ctx, bucket, forceDelete); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Write success response. writeSuccessNoContent(w) @@ -1324,6 +1349,20 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri return } + // Call site replication hook. + // + // We encode the xml bytes as base64 to ensure there are no encoding + // errors. + cfgStr := base64.StdEncoding.EncodeToString(configData) + if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeObjectLockConfig, + Bucket: bucket, + ObjectLockConfig: &cfgStr, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Write success response. writeSuccessResponseHeadersOnly(w) } @@ -1415,6 +1454,20 @@ func (api objectAPIHandlers) PutBucketTaggingHandler(w http.ResponseWriter, r *h return } + // Call site replication hook. + // + // We encode the xml bytes as base64 to ensure there are no encoding + // errors. + cfgStr := base64.StdEncoding.EncodeToString(configData) + if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeTags, + Bucket: bucket, + Tags: &cfgStr, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Write success response. writeSuccessResponseHeadersOnly(w) } @@ -1483,6 +1536,14 @@ func (api objectAPIHandlers) DeleteBucketTaggingHandler(w http.ResponseWriter, r return } + if err := globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeTags, + Bucket: bucket, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Write success response. writeSuccessResponseHeadersOnly(w) } diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 45c6af3a9..b0952a61a 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -18,12 +18,15 @@ package cmd import ( + "bytes" "encoding/json" "io" + "io/ioutil" "net/http" humanize "github.com/dustin/go-humanize" "github.com/gorilla/mux" + "github.com/minio/madmin-go" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/bucket/policy" ) @@ -76,7 +79,13 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht return } - bucketPolicy, err := policy.ParseConfig(io.LimitReader(r.Body, r.ContentLength), bucket) + bucketPolicyBytes, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength)) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + bucketPolicy, err := policy.ParseConfig(bytes.NewReader(bucketPolicyBytes), bucket) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -99,6 +108,16 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht return } + // Call site replication hook. + if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypePolicy, + Bucket: bucket, + Policy: bucketPolicyBytes, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Success. writeSuccessNoContent(w) } @@ -134,6 +153,15 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r return } + // Call site replication hook. + if err := globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypePolicy, + Bucket: bucket, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + // Success. writeSuccessNoContent(w) } diff --git a/cmd/globals.go b/cmd/globals.go index 74eae64f9..df7bfda9e 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -256,6 +256,9 @@ var ( // Allocated etcd endpoint for config and bucket DNS. globalEtcdClient *etcd.Client + // Cluster replication manager. + globalSiteReplicationSys SiteReplicationSys + // Is set to true when Bucket federation is requested // and is 'true' when etcdConfig.PathPrefix is empty globalBucketFederation bool diff --git a/cmd/iam.go b/cmd/iam.go index 845c1a716..6a1a54721 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -514,6 +514,7 @@ func (sys *IAMSys) Load(ctx context.Context, store IAMStorageAPI) error { return err } + // load service accounts if err := store.loadUsers(ctx, svcUser, iamUsersMap); err != nil { return err } @@ -1427,6 +1428,31 @@ func (sys *IAMSys) GetServiceAccount(ctx context.Context, accessKey string) (aut return sa, embeddedPolicy, nil } +// GetClaimsForSvcAcc - gets the claims associated with the service account. +func (sys *IAMSys) GetClaimsForSvcAcc(ctx context.Context, accessKey string) (map[string]interface{}, error) { + if !sys.Initialized() { + return nil, errServerNotInitialized + } + + if sys.usersSysType != LDAPUsersSysType { + return nil, nil + } + + sys.store.rlock() + defer sys.store.runlock() + + sa, ok := sys.iamUsersMap[accessKey] + if !ok || !sa.IsServiceAccount() { + return nil, errNoSuchServiceAccount + } + + jwtClaims, err := auth.ExtractClaims(sa.SessionToken, globalActiveCred.SecretKey) + if err != nil { + return nil, err + } + return jwtClaims.Map(), nil +} + // DeleteServiceAccount - delete a service account func (sys *IAMSys) DeleteServiceAccount(ctx context.Context, accessKey string) error { if !sys.Initialized() { diff --git a/cmd/notification.go b/cmd/notification.go index 4266f0058..3d132a5bd 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1557,3 +1557,23 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent return results } + +// ReloadSiteReplicationConfig - tells all peer minio nodes to reload the +// site-replication configuration. +func (sys *NotificationSys) ReloadSiteReplicationConfig(ctx context.Context) []error { + errs := make([]error, len(sys.allPeerClients)) + var wg sync.WaitGroup + for index := range sys.peerClients { + if sys.peerClients[index] == nil { + continue + } + wg.Add(1) + go func(index int) { + defer wg.Done() + errs[index] = sys.peerClients[index].ReloadSiteReplicationConfig(ctx) + }(index) + } + + wg.Wait() + return errs +} diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index ad4edadf6..517a837f3 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -1029,3 +1029,12 @@ func (client *peerRESTClient) Speedtest(ctx context.Context, size, concurrent in } return result, nil } + +func (client *peerRESTClient) ReloadSiteReplicationConfig(ctx context.Context) error { + respBody, err := client.callWithContext(context.Background(), peerRESTMethodReloadSiteReplicationConfig, nil, nil, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 9f545f875..dfb954682 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -25,47 +25,48 @@ const ( ) const ( - peerRESTMethodHealth = "/health" - peerRESTMethodServerInfo = "/serverinfo" - peerRESTMethodDriveInfo = "/driveinfo" - peerRESTMethodNetInfo = "/netinfo" - peerRESTMethodCPUInfo = "/cpuinfo" - peerRESTMethodDiskHwInfo = "/diskhwinfo" - peerRESTMethodOsInfo = "/osinfo" - peerRESTMethodMemInfo = "/meminfo" - peerRESTMethodProcInfo = "/procinfo" - peerRESTMethodSysErrors = "/syserrors" - peerRESTMethodSysServices = "/sysservices" - peerRESTMethodSysConfig = "/sysconfig" - peerRESTMethodDispatchNetInfo = "/dispatchnetinfo" - peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata" - peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata" - peerRESTMethodGetBucketStats = "/getbucketstats" - peerRESTMethodServerUpdate = "/serverupdate" - peerRESTMethodSignalService = "/signalservice" - peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" - peerRESTMethodGetLocks = "/getlocks" - peerRESTMethodLoadUser = "/loaduser" - peerRESTMethodLoadServiceAccount = "/loadserviceaccount" - peerRESTMethodDeleteUser = "/deleteuser" - peerRESTMethodDeleteServiceAccount = "/deleteserviceaccount" - peerRESTMethodLoadPolicy = "/loadpolicy" - peerRESTMethodLoadPolicyMapping = "/loadpolicymapping" - peerRESTMethodDeletePolicy = "/deletepolicy" - peerRESTMethodLoadGroup = "/loadgroup" - peerRESTMethodStartProfiling = "/startprofiling" - peerRESTMethodDownloadProfilingData = "/downloadprofilingdata" - peerRESTMethodCycleBloom = "/cyclebloom" - peerRESTMethodTrace = "/trace" - peerRESTMethodListen = "/listen" - peerRESTMethodLog = "/log" - peerRESTMethodGetLocalDiskIDs = "/getlocaldiskids" - peerRESTMethodGetBandwidth = "/bandwidth" - peerRESTMethodGetMetacacheListing = "/getmetacache" - peerRESTMethodUpdateMetacacheListing = "/updatemetacache" - peerRESTMethodGetPeerMetrics = "/peermetrics" - peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig" - peerRESTMethodSpeedtest = "/speedtest" + peerRESTMethodHealth = "/health" + peerRESTMethodServerInfo = "/serverinfo" + peerRESTMethodDriveInfo = "/driveinfo" + peerRESTMethodNetInfo = "/netinfo" + peerRESTMethodCPUInfo = "/cpuinfo" + peerRESTMethodDiskHwInfo = "/diskhwinfo" + peerRESTMethodOsInfo = "/osinfo" + peerRESTMethodMemInfo = "/meminfo" + peerRESTMethodProcInfo = "/procinfo" + peerRESTMethodSysErrors = "/syserrors" + peerRESTMethodSysServices = "/sysservices" + peerRESTMethodSysConfig = "/sysconfig" + peerRESTMethodDispatchNetInfo = "/dispatchnetinfo" + peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata" + peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata" + peerRESTMethodGetBucketStats = "/getbucketstats" + peerRESTMethodServerUpdate = "/serverupdate" + peerRESTMethodSignalService = "/signalservice" + peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" + peerRESTMethodGetLocks = "/getlocks" + peerRESTMethodLoadUser = "/loaduser" + peerRESTMethodLoadServiceAccount = "/loadserviceaccount" + peerRESTMethodDeleteUser = "/deleteuser" + peerRESTMethodDeleteServiceAccount = "/deleteserviceaccount" + peerRESTMethodLoadPolicy = "/loadpolicy" + peerRESTMethodLoadPolicyMapping = "/loadpolicymapping" + peerRESTMethodDeletePolicy = "/deletepolicy" + peerRESTMethodLoadGroup = "/loadgroup" + peerRESTMethodStartProfiling = "/startprofiling" + peerRESTMethodDownloadProfilingData = "/downloadprofilingdata" + peerRESTMethodCycleBloom = "/cyclebloom" + peerRESTMethodTrace = "/trace" + peerRESTMethodListen = "/listen" + peerRESTMethodLog = "/log" + peerRESTMethodGetLocalDiskIDs = "/getlocaldiskids" + peerRESTMethodGetBandwidth = "/bandwidth" + peerRESTMethodGetMetacacheListing = "/getmetacache" + peerRESTMethodUpdateMetacacheListing = "/updatemetacache" + peerRESTMethodGetPeerMetrics = "/peermetrics" + peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig" + peerRESTMethodSpeedtest = "/speedtest" + peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig" ) const ( diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 536533767..4b5024d6b 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -514,7 +514,6 @@ func (s *peerRESTServer) GetMemInfoHandler(w http.ResponseWriter, r *http.Reques // (only the config that are of concern to minio) func (s *peerRESTServer) GetSysConfigHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { - fmt.Println("Invalid request") s.writeErrorResponse(w, errors.New("Invalid request")) return } @@ -532,7 +531,6 @@ func (s *peerRESTServer) GetSysConfigHandler(w http.ResponseWriter, r *http.Requ // (only the services that are of concern to minio) func (s *peerRESTServer) GetSysServicesHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { - fmt.Println("Invalid request") s.writeErrorResponse(w, errors.New("Invalid request")) return } @@ -584,6 +582,24 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *h } } +// ReloadSiteReplicationConfigHandler - reloads site replication configuration from the disks +func (s *peerRESTServer) ReloadSiteReplicationConfigHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "LoadSiteReplication") + + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return + } + + logger.LogIf(r.Context(), globalSiteReplicationSys.Init(ctx, objAPI)) +} + // GetBucketStatsHandler - fetches current in-memory bucket stats, currently only // returns BucketReplicationStatus func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Request) { @@ -1385,4 +1401,5 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerMetrics).HandlerFunc(httpTraceHdrs(server.GetPeerMetrics)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler)) } diff --git a/cmd/server-main.go b/cmd/server-main.go index f5b0a565b..eeef4a9be 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -414,6 +414,9 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { // Initialize bucket notification sub-system. globalNotificationSys.Init(ctx, buckets, newObject) + // Initialize site replication manager. + globalSiteReplicationSys.Init(ctx, newObject) + if globalIsErasure { // Initialize transition tier configuration manager if err = globalTierConfigMgr.Init(ctx, newObject); err != nil { diff --git a/cmd/site-replication.go b/cmd/site-replication.go new file mode 100644 index 000000000..b38eb7965 --- /dev/null +++ b/cmd/site-replication.go @@ -0,0 +1,1652 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/tls" + "encoding/base64" + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "net/http" + "net/url" + "sort" + "strings" + "sync" + "time" + + "github.com/minio/madmin-go" + minioClient "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/replication" + "github.com/minio/minio-go/v7/pkg/set" + "github.com/minio/minio/internal/auth" + sreplication "github.com/minio/minio/internal/bucket/replication" + "github.com/minio/minio/internal/bucket/versioning" + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/bucket/policy" + iampolicy "github.com/minio/pkg/iam/policy" +) + +const ( + srStatePrefix = minioConfigPrefix + "/site-replication" + + srStateFile = "state.json" +) + +const ( + srStateFormatVersion1 = 1 +) + +var ( + errSRCannotJoin = errors.New("this site is already configured for site-replication") + errSRDuplicateSites = errors.New("duplicate sites provided for site-replication") + errSRSelfNotFound = errors.New("none of the given sites correspond to the current one") + errSRPeerNotFound = errors.New("peer not found") + errSRNotEnabled = errors.New("site replication is not enabled") +) + +func errSRInvalidRequest(err error) SRError { + return SRError{ + Cause: err, + Code: ErrSiteReplicationInvalidRequest, + } +} + +func errSRPeerResp(err error) SRError { + return SRError{ + Cause: err, + Code: ErrSiteReplicationPeerResp, + } +} + +func errSRBackendIssue(err error) SRError { + return SRError{ + Cause: err, + Code: ErrSiteReplicationBackendIssue, + } +} + +func errSRServiceAccount(err error) SRError { + return SRError{ + Cause: err, + Code: ErrSiteReplicationServiceAccountError, + } +} + +func errSRBucketConfigError(err error) SRError { + return SRError{ + Cause: err, + Code: ErrSiteReplicationBucketConfigError, + } + +} + +func errSRBucketMetaError(err error) SRError { + return SRError{ + Cause: err, + Code: ErrSiteReplicationBucketMetaError, + } +} + +func errSRIAMError(err error) SRError { + return SRError{ + Cause: err, + Code: ErrSiteReplicationIAMError, + } +} + +var ( + errSRObjectLayerNotReady = SRError{ + Cause: fmt.Errorf("object layer not ready"), + Code: ErrServerNotInitialized, + } +) + +func getSRStateFilePath() string { + return srStatePrefix + SlashSeparator + srStateFile +} + +// SRError - wrapped error for site replication. +type SRError struct { + Cause error + Code APIErrorCode +} + +func (c SRError) Error() string { + return c.Cause.Error() +} + +func wrapSRErr(err error) SRError { + return SRError{Cause: err, Code: ErrInternalError} +} + +// SiteReplicationSys - manages cluster-level replication. +type SiteReplicationSys struct { + sync.RWMutex + + enabled bool + + // In-memory and persisted multi-site replication state. + state srState +} + +type srState srStateV1 + +// srStateV1 represents version 1 of the site replication state persistence +// format. +type srStateV1 struct { + Name string `json:"name"` + + // Peers maps peers by their deploymentID + Peers map[string]madmin.PeerInfo `json:"peers"` + ServiceAccountAccessKey string `json:"serviceAccountAccessKey"` +} + +// srStateData represents the format of the current `srStateFile`. +type srStateData struct { + Version int `json:"version"` + + SRState srStateV1 `json:"srState"` +} + +// Init - initialize the site replication manager. +func (c *SiteReplicationSys) Init(ctx context.Context, objAPI ObjectLayer) error { + err := c.loadFromDisk(ctx, objAPI) + if err == errConfigNotFound { + return nil + } + + c.RLock() + defer c.RUnlock() + if c.enabled { + logger.Info("Cluster Replication initialized.") + } + + return err +} + +func (c *SiteReplicationSys) loadFromDisk(ctx context.Context, objAPI ObjectLayer) error { + buf, err := readConfig(ctx, objAPI, getSRStateFilePath()) + if err != nil { + return err + } + + // attempt to read just the version key in the state file to ensure we + // are reading a compatible version. + var ver struct { + Version int `json:"version"` + } + err = json.Unmarshal(buf, &ver) + if err != nil { + return err + } + if ver.Version != srStateFormatVersion1 { + return fmt.Errorf("Unexpected ClusterRepl state version: %d", ver.Version) + } + + var sdata srStateData + err = json.Unmarshal(buf, &sdata) + if err != nil { + return err + } + + c.Lock() + defer c.Unlock() + c.state = srState(sdata.SRState) + c.enabled = true + return nil +} + +func (c *SiteReplicationSys) saveToDisk(ctx context.Context, state srState) error { + sdata := srStateData{ + Version: srStateFormatVersion1, + SRState: srStateV1(state), + } + buf, err := json.Marshal(sdata) + if err != nil { + return err + } + + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized + } + err = saveConfig(ctx, objAPI, getSRStateFilePath(), buf) + if err != nil { + return err + } + + for _, e := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) { + logger.LogIf(ctx, e) + } + + c.Lock() + defer c.Unlock() + c.state = state + c.enabled = true + return nil +} + +const ( + // Access key of service account used for perform cluster-replication + // operations. + siteReplicatorSvcAcc = "site-replicator-0" +) + +// AddPeerClusters - add cluster sites for replication configuration. +func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, sites []madmin.PeerSite) (madmin.ReplicateAddStatus, SRError) { + // If current cluster is already SR enabled, we fail. + if c.enabled { + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRCannotJoin) + } + + // Only one of the clusters being added, can have any buckets (i.e. self + // here) - others must be empty. + selfIdx := -1 + localHasBuckets := false + nonLocalPeerWithBuckets := "" + deploymentIDs := make([]string, 0, len(sites)) + deploymentIDsSet := set.NewStringSet() + for i, v := range sites { + admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey) + if err != nil { + return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) + } + info, err := admClient.ServerInfo(ctx) + if err != nil { + return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to fetch server info for %s: %w", v.Name, err)) + } + + deploymentID := info.DeploymentID + if deploymentID == "" { + return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to fetch deploymentID for %s: value was empty!", v.Name)) + } + + deploymentIDs = append(deploymentIDs, deploymentID) + + // deploymentIDs must be unique + if deploymentIDsSet.Contains(deploymentID) { + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRDuplicateSites) + } + deploymentIDsSet.Add(deploymentID) + + if deploymentID == globalDeploymentID { + selfIdx = i + objAPI := newObjectLayerFn() + if objAPI == nil { + return madmin.ReplicateAddStatus{}, errSRObjectLayerNotReady + } + res, err := objAPI.ListBuckets(ctx) + if err != nil { + return madmin.ReplicateAddStatus{}, errSRBackendIssue(err) + } + if len(res) > 0 { + localHasBuckets = true + } + continue + } + + s3Client, err := getS3Client(v) + if err != nil { + return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to create s3 client for %s: %w", v.Name, err)) + } + buckets, err := s3Client.ListBuckets(ctx) + if err != nil { + return madmin.ReplicateAddStatus{}, errSRPeerResp(fmt.Errorf("unable to list buckets for %s: %v", v.Name, err)) + } + + if len(buckets) > 0 { + nonLocalPeerWithBuckets = v.Name + } + } + + // For this `add` API, either all clusters must be empty or the local + // cluster must be the only one having some buckets. + + if localHasBuckets && nonLocalPeerWithBuckets != "" { + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("Only one cluster may have data when configuring site replication")) + } + + if !localHasBuckets && nonLocalPeerWithBuckets != "" { + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("Please send your request to the cluster containing data/buckets: %s", nonLocalPeerWithBuckets)) + } + + // validate that all clusters are using the same (LDAP based) + // external IDP. + pass, verr := c.validateIDPSettings(ctx, sites, selfIdx) + if verr.Cause != nil { + return madmin.ReplicateAddStatus{}, verr + } + if !pass { + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("All cluster sites must have the same (LDAP) IDP settings.")) + } + + // FIXME: Ideally, we also need to check if there are any global IAM + // policies and any (LDAP user created) service accounts on the other + // peer clusters, and if so, reject the cluster replicate add request. + // This is not yet implemented. + + // VALIDATIONS COMPLETE. + + // Create a common service account for all clusters, with root + // permissions. + + // Create a local service account. + + // Generate a secret key for the service account. + var secretKey string + { + secretKeyBuf := make([]byte, 40) + n, err := rand.Read(secretKeyBuf) + if err == nil && n != 40 { + err = fmt.Errorf("Unable to read 40 random bytes to generate secret key") + } + if err != nil { + return madmin.ReplicateAddStatus{}, SRError{ + Cause: err, + Code: ErrInternalError, + } + } + secretKey = strings.Replace(string([]byte(base64.StdEncoding.EncodeToString(secretKeyBuf))[:40]), + "/", "+", -1) + } + + svcCred, err := globalIAMSys.NewServiceAccount(ctx, sites[selfIdx].AccessKey, nil, newServiceAccountOpts{ + accessKey: siteReplicatorSvcAcc, + secretKey: secretKey, + }) + if err != nil { + return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err)) + } + + // Notify all other Minio peers to reload user the service account + for _, nerr := range globalNotificationSys.LoadServiceAccount(svcCred.AccessKey) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + joinReq := madmin.SRInternalJoinReq{ + SvcAcctAccessKey: svcCred.AccessKey, + SvcAcctSecretKey: svcCred.SecretKey, + Peers: make(map[string]madmin.PeerInfo), + } + for i, v := range sites { + joinReq.Peers[deploymentIDs[i]] = madmin.PeerInfo{ + Endpoint: v.Endpoint, + Name: v.Name, + DeploymentID: deploymentIDs[i], + } + } + + addedCount := 0 + var peerAddErr SRError + for i, v := range sites { + if i == selfIdx { + continue + } + admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey) + if err != nil { + peerAddErr = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) + break + } + joinReq.SvcAcctParent = v.AccessKey + err = admClient.SRInternalJoin(ctx, joinReq) + if err != nil { + peerAddErr = errSRPeerResp(fmt.Errorf("unable to link with peer %s: %w", v.Name, err)) + break + } + addedCount++ + } + + if peerAddErr.Cause != nil { + if addedCount == 0 { + return madmin.ReplicateAddStatus{}, peerAddErr + } + // In this case, it means at least one cluster was added + // successfully, we need to send a response to the client with + // some details - FIXME: the disks on this cluster would need to + // be cleaned to recover. + partial := madmin.ReplicateAddStatus{ + Status: madmin.ReplicateAddStatusPartial, + ErrDetail: peerAddErr.Error(), + } + return partial, SRError{} + } + + // Other than handling existing buckets, we can now save the cluster + // replication configuration state. + state := srState{ + Name: sites[selfIdx].Name, + Peers: joinReq.Peers, + ServiceAccountAccessKey: svcCred.AccessKey, + } + err = c.saveToDisk(ctx, state) + if err != nil { + return madmin.ReplicateAddStatus{ + Status: madmin.ReplicateAddStatusPartial, + ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), + }, SRError{} + } + + result := madmin.ReplicateAddStatus{ + Success: true, + Status: madmin.ReplicateAddStatusSuccess, + } + initialSyncErr := c.syncLocalToPeers(ctx) + if initialSyncErr.Code != ErrNone { + result.InitialSyncErrorMessage = initialSyncErr.Error() + } + + return result, SRError{} +} + +// InternalJoinReq - internal API handler to respond to a peer cluster's request +// to join. +func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRInternalJoinReq) SRError { + if c.enabled { + return errSRInvalidRequest(errSRCannotJoin) + } + + var ourName string + for d, p := range arg.Peers { + if d == globalDeploymentID { + ourName = p.Name + break + } + } + if ourName == "" { + return errSRInvalidRequest(errSRSelfNotFound) + } + + svcCred, err := globalIAMSys.NewServiceAccount(ctx, arg.SvcAcctParent, nil, newServiceAccountOpts{ + accessKey: arg.SvcAcctAccessKey, + secretKey: arg.SvcAcctSecretKey, + }) + if err != nil { + return errSRServiceAccount(fmt.Errorf("unable to create service account on %s: %v", ourName, err)) + } + + // Notify all other Minio peers to reload the service account + for _, nerr := range globalNotificationSys.LoadServiceAccount(svcCred.AccessKey) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + state := srState{ + Name: ourName, + Peers: arg.Peers, + ServiceAccountAccessKey: arg.SvcAcctAccessKey, + } + err = c.saveToDisk(ctx, state) + if err != nil { + return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err)) + } + return SRError{} +} + +// GetIDPSettings returns info about the configured identity provider. It is +// used to validate that all peers have the same IDP. +func (c *SiteReplicationSys) GetIDPSettings(ctx context.Context) madmin.IDPSettings { + return madmin.IDPSettings{ + IsLDAPEnabled: globalLDAPConfig.Enabled, + LDAPUserDNSearchBase: globalLDAPConfig.UserDNSearchBaseDN, + LDAPUserDNSearchFilter: globalLDAPConfig.UserDNSearchFilter, + LDAPGroupSearchBase: globalLDAPConfig.GroupSearchBaseDistName, + LDAPGroupSearchFilter: globalLDAPConfig.GroupSearchFilter, + } +} + +func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []madmin.PeerSite, selfIdx int) (bool, SRError) { + s := make([]madmin.IDPSettings, 0, len(peers)) + for i, v := range peers { + if i == selfIdx { + s = append(s, c.GetIDPSettings(ctx)) + continue + } + + admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey) + if err != nil { + return false, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) + } + + is, err := admClient.SRInternalGetIDPSettings(ctx) + if err != nil { + return false, errSRPeerResp(fmt.Errorf("unable to fetch IDP settings from %s: %v", v.Name, err)) + } + s = append(s, is) + } + + for _, v := range s { + if !v.IsLDAPEnabled { + return false, SRError{} + } + } + for i := 1; i < len(s); i++ { + if s[i] != s[0] { + return false, SRError{} + } + } + return true, SRError{} +} + +// GetClusterInfo - returns site replication information. +func (c *SiteReplicationSys) GetClusterInfo(ctx context.Context) (info madmin.SiteReplicationInfo, err error) { + c.RLock() + defer c.RUnlock() + if !c.enabled { + return info, nil + } + + info.Enabled = true + info.Name = c.state.Name + info.Sites = make([]madmin.PeerInfo, 0, len(c.state.Peers)) + for _, peer := range c.state.Peers { + info.Sites = append(info.Sites, peer) + } + sort.SliceStable(info.Sites, func(i, j int) bool { + return info.Sites[i].Name < info.Sites[j].Name + }) + + info.ServiceAccountAccessKey = c.state.ServiceAccountAccessKey + return info, nil +} + +// MakeBucketHook - called during a regular make bucket call when cluster +// replication is enabled. It is responsible for the creation of the same bucket +// on remote clusters, and creating replication rules on local and peer +// clusters. +func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string, opts BucketOptions) error { + // At this point, the local bucket is created. + + c.RLock() + defer c.RUnlock() + if !c.enabled { + return nil + } + + optsMap := make(map[string]string) + if opts.Location != "" { + optsMap["location"] = opts.Location + } + if opts.LockEnabled { + optsMap["lockEnabled"] = "" + } + + // Create bucket and enable versioning on all peers. + makeBucketConcErr := c.concDo( + func() error { + err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts) + logger.LogIf(ctx, c.annotateErr("MakeWithVersioning", err)) + return err + }, + func(deploymentID string, p madmin.PeerInfo) error { + admClient, err := c.getAdminClient(ctx, deploymentID) + if err != nil { + return err + } + + err = admClient.SRInternalBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap) + logger.LogIf(ctx, c.annotatePeerErr(p.Name, "MakeWithVersioning", err)) + return err + }, + ) + // If all make-bucket-and-enable-versioning operations failed, nothing + // more to do. + if makeBucketConcErr.allFailed() { + return makeBucketConcErr + } + + // Log any errors in make-bucket operations. + logger.LogIf(ctx, makeBucketConcErr.summaryErr) + + // Create bucket remotes and add replication rules for the bucket on + // self and peers. + makeRemotesConcErr := c.concDo( + func() error { + err := c.PeerBucketConfigureReplHandler(ctx, bucket) + logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err)) + return err + }, + func(deploymentID string, p madmin.PeerInfo) error { + admClient, err := c.getAdminClient(ctx, deploymentID) + if err != nil { + return err + } + + err = admClient.SRInternalBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil) + logger.LogIf(ctx, c.annotatePeerErr(p.Name, "ConfigureRepl", err)) + return err + }, + ) + err := makeRemotesConcErr.summaryErr + if err != nil { + return err + } + + return nil +} + +// DeleteBucketHook - called during a regular delete bucket call when cluster +// replication is enabled. It is responsible for the deletion of the same bucket +// on remote clusters. +func (c *SiteReplicationSys) DeleteBucketHook(ctx context.Context, bucket string, forceDelete bool) error { + // At this point, the local bucket is deleted. + + c.RLock() + defer c.RUnlock() + if !c.enabled { + return nil + } + + op := madmin.DeleteBucketBktOp + if forceDelete { + op = madmin.ForceDeleteBucketBktOp + } + + // Send bucket delete to other clusters. + cErr := c.concDo(nil, func(deploymentID string, p madmin.PeerInfo) error { + admClient, err := c.getAdminClient(ctx, deploymentID) + if err != nil { + return wrapSRErr(err) + } + + err = admClient.SRInternalBucketOps(ctx, bucket, op, nil) + logger.LogIf(ctx, c.annotatePeerErr(p.Name, "DeleteBucket", err)) + return err + }) + return cErr.summaryErr +} + +// PeerBucketMakeWithVersioningHandler - creates bucket and enables versioning. +func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Context, bucket string, opts BucketOptions) error { + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized + } + err := objAPI.MakeBucketWithLocation(ctx, bucket, opts) + if err != nil { + // Check if this is a bucket exists error. + _, ok1 := err.(BucketExists) + _, ok2 := err.(BucketAlreadyExists) + if !ok1 && !ok2 { + logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err)) + return wrapSRErr(err) + } + } else { + // Load updated bucket metadata into memory as new + // bucket was created. + globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) + } + + // Enable versioning on the bucket. + config, err := globalBucketVersioningSys.Get(bucket) + if err != nil { + return wrapSRErr(err) + } + if !config.Enabled() { + verConf := versioning.Versioning{ + Status: versioning.Enabled, + } + // FIXME: need to confirm if skipping object lock and + // versioning-suspended state checks are valid here. + cfgData, err := xml.Marshal(verConf) + if err != nil { + return wrapSRErr(err) + } + err = globalBucketMetadataSys.Update(bucket, bucketVersioningConfig, cfgData) + if err != nil { + logger.LogIf(ctx, c.annotateErr("Versioning enabling error on peer call", err)) + return wrapSRErr(err) + } + } + return nil +} + +// PeerBucketConfigureReplHandler - configures replication remote and +// replication rules to all other peers for the local bucket. +func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, bucket string) error { + creds, err := c.getPeerCreds() + if err != nil { + return wrapSRErr(err) + } + + // The following function, creates a bucket remote and sets up a bucket + // replication rule for the given peer. + configurePeerFn := func(d string, peer madmin.PeerInfo) error { + ep, _ := url.Parse(peer.Endpoint) + targets := globalBucketTargetSys.ListTargets(ctx, bucket, string(madmin.ReplicationService)) + targetARN := "" + for _, target := range targets { + if target.SourceBucket == bucket && + target.TargetBucket == bucket && + target.Endpoint == ep.Host && + target.Secure == (ep.Scheme == "https") && + target.Type == madmin.ReplicationService { + targetARN = target.Arn + break + } + } + if targetARN == "" { + bucketTarget := madmin.BucketTarget{ + SourceBucket: bucket, + Endpoint: ep.Host, + Credentials: &madmin.Credentials{ + AccessKey: creds.AccessKey, + SecretKey: creds.SecretKey, + }, + TargetBucket: bucket, + Secure: ep.Scheme == "https", + API: "s3v4", + Type: madmin.ReplicationService, + Region: "", + ReplicationSync: false, + } + bucketTarget.Arn = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget) + err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false) + if err != nil { + logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err)) + return err + } + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + return err + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + return err + } + if err = globalBucketMetadataSys.Update(bucket, bucketTargetsFile, tgtBytes); err != nil { + return err + } + targetARN = bucketTarget.Arn + } + + // Create bucket replication rule to this peer. + + // To add the bucket replication rule, we fetch the current + // server configuration, and convert it to minio-go's + // replication configuration type (by converting to xml and + // parsing it back), use minio-go's add rule function, and + // finally convert it back to the server type (again via xml). + // This is needed as there is no add-rule function in the server + // yet. + + // Though we do not check if the rule already exists, this is + // not a problem as we are always using the same replication + // rule ID - if the rule already exists, it is just replaced. + replicationConfigS, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket) + if err != nil { + _, ok := err.(BucketReplicationConfigNotFound) + if !ok { + return err + } + } + var replicationConfig replication.Config + if replicationConfigS != nil { + replCfgSBytes, err := xml.Marshal(replicationConfigS) + if err != nil { + return err + } + err = xml.Unmarshal(replCfgSBytes, &replicationConfig) + if err != nil { + return err + } + } + err = replicationConfig.AddRule(replication.Options{ + // Set the ID so we can identify the rule as being + // created for site-replication and include the + // destination cluster's deployment ID. + ID: fmt.Sprintf("site-repl-%s", d), + + // Use a helper to generate unique priority numbers. + Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)), + + Op: replication.AddOption, + RuleStatus: "enable", + DestBucket: targetARN, + + // Replicate everything! + ReplicateDeletes: "enable", + ReplicateDeleteMarkers: "enable", + ReplicaSync: "enable", + ExistingObjectReplicate: "enable", + }) + if err != nil { + logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err)) + return err + } + // Now convert the configuration back to server's type so we can + // do some validation. + newReplCfgBytes, err := xml.Marshal(replicationConfig) + if err != nil { + return err + } + newReplicationConfig, err := sreplication.ParseConfig(bytes.NewReader(newReplCfgBytes)) + if err != nil { + return err + } + sameTarget, apiErr := validateReplicationDestination(ctx, bucket, newReplicationConfig) + if apiErr != noError { + return fmt.Errorf("bucket replication config validation error: %#v", apiErr) + } + err = newReplicationConfig.Validate(bucket, sameTarget) + if err != nil { + return err + } + // Config looks good, so we save it. + replCfgData, err := xml.Marshal(newReplicationConfig) + if err != nil { + return err + } + err = globalBucketMetadataSys.Update(bucket, bucketReplicationConfig, replCfgData) + logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error updating replication configuration", err)) + return err + } + + errMap := make(map[string]error, len(c.state.Peers)) + for d, peer := range c.state.Peers { + if d == globalDeploymentID { + continue + } + if err := configurePeerFn(d, peer); err != nil { + errMap[d] = err + } + } + return c.toErrorFromErrMap(errMap) +} + +// PeerBucketDeleteHandler - deletes bucket on local in response to a delete +// bucket request from a peer. +func (c *SiteReplicationSys) PeerBucketDeleteHandler(ctx context.Context, bucket string, forceDelete bool) error { + c.RLock() + defer c.RUnlock() + if !c.enabled { + return errSRNotEnabled + } + + // FIXME: need to handle cases where globalDNSConfig is set. + + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized + } + err := objAPI.DeleteBucket(ctx, bucket, DeleteBucketOptions{Force: forceDelete}) + if err != nil { + return err + } + + globalNotificationSys.DeleteBucketMetadata(ctx, bucket) + + return nil +} + +// IAMChangeHook - called when IAM items need to be replicated to peer clusters. +// This includes named policy creation, policy mapping changes and service +// account changes. +// +// All policies are replicated. +// +// Policy mappings are only replicated when they are for LDAP users or groups +// (as an external IDP is always assumed when SR is used). In the case of +// OpenID, such mappings are provided from the IDP directly and so are not +// applicable here. +// +// Only certain service accounts can be replicated: +// +// Service accounts created for STS credentials using an external IDP: such STS +// credentials would be valid on the peer clusters as they are assumed to be +// using the same external IDP. Service accounts when using internal IDP or for +// root user will not be replicated. +// +// STS accounts are replicated, but only if the session token is verifiable +// using the local cluster's root credential. +func (c *SiteReplicationSys) IAMChangeHook(ctx context.Context, item madmin.SRIAMItem) error { + // The IAM item has already been applied to the local cluster at this + // point, and only needs to be updated on all remote peer clusters. + + c.RLock() + defer c.RUnlock() + if !c.enabled { + return nil + } + + cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { + admClient, err := c.getAdminClient(ctx, d) + if err != nil { + return wrapSRErr(err) + } + + err = admClient.SRInternalReplicateIAMItem(ctx, item) + logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRInternalReplicateIAMItem", err)) + return err + }) + return cErr.summaryErr +} + +// PeerAddPolicyHandler - copies IAM policy to local. A nil policy argument, +// causes the named policy to be deleted. +func (c *SiteReplicationSys) PeerAddPolicyHandler(ctx context.Context, policyName string, p *iampolicy.Policy) error { + var err error + if p == nil { + err = globalIAMSys.DeletePolicy(policyName) + } else { + err = globalIAMSys.SetPolicy(policyName, *p) + } + if err != nil { + return wrapSRErr(err) + } + + if p != nil { + // Notify all other MinIO peers to reload policy + for _, nerr := range globalNotificationSys.LoadPolicy(policyName) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + return nil + } + + // Notify all other MinIO peers to delete policy + for _, nerr := range globalNotificationSys.DeletePolicy(policyName) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + return nil +} + +// PeerSvcAccChangeHandler - copies service-account change to local. +func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change madmin.SRSvcAccChange) error { + switch { + case change.Create != nil: + var sp *iampolicy.Policy + var err error + if len(change.Create.SessionPolicy) > 0 { + sp, err = iampolicy.ParseConfig(bytes.NewReader(change.Create.SessionPolicy)) + if err != nil { + return wrapSRErr(err) + } + } + + opts := newServiceAccountOpts{ + accessKey: change.Create.AccessKey, + secretKey: change.Create.SecretKey, + sessionPolicy: sp, + claims: change.Create.Claims, + } + newCred, err := globalIAMSys.NewServiceAccount(ctx, change.Create.Parent, change.Create.Groups, opts) + if err != nil { + return wrapSRErr(err) + } + + // Notify all other Minio peers to reload the service account + for _, nerr := range globalNotificationSys.LoadServiceAccount(newCred.AccessKey) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + case change.Update != nil: + var sp *iampolicy.Policy + var err error + if len(change.Update.SessionPolicy) > 0 { + sp, err = iampolicy.ParseConfig(bytes.NewReader(change.Update.SessionPolicy)) + if err != nil { + return wrapSRErr(err) + } + } + opts := updateServiceAccountOpts{ + secretKey: change.Update.SecretKey, + status: change.Update.Status, + sessionPolicy: sp, + } + + err = globalIAMSys.UpdateServiceAccount(ctx, change.Update.AccessKey, opts) + if err != nil { + return wrapSRErr(err) + } + + // Notify all other Minio peers to reload the service account + for _, nerr := range globalNotificationSys.LoadServiceAccount(change.Update.AccessKey) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + case change.Delete != nil: + err := globalIAMSys.DeleteServiceAccount(ctx, change.Delete.AccessKey) + if err != nil { + return wrapSRErr(err) + } + + for _, nerr := range globalNotificationSys.DeleteServiceAccount(change.Delete.AccessKey) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + } + + return nil +} + +// PeerPolicyMappingHandler - copies policy mapping to local. +func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mapping madmin.SRPolicyMapping) error { + err := globalIAMSys.PolicyDBSet(mapping.UserOrGroup, mapping.Policy, mapping.IsGroup) + if err != nil { + return wrapSRErr(err) + } + + // Notify all other MinIO peers to reload policy + for _, nerr := range globalNotificationSys.LoadPolicyMapping(mapping.UserOrGroup, mapping.IsGroup) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + return nil +} + +// PeerSTSAccHandler - replicates STS credential locally. +func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred madmin.SRSTSCredential) error { + // Verify the session token of the stsCred + claims, err := auth.ExtractClaims(stsCred.SessionToken, globalActiveCred.SecretKey) + if err != nil { + logger.LogIf(ctx, err) + return fmt.Errorf("STS credential could not be verified") + } + + mapClaims := claims.Map() + expiry, err := auth.ExpToInt64(mapClaims["exp"]) + if err != nil { + return fmt.Errorf("Expiry claim was not found") + } + + // Extract the username and lookup DN and groups in LDAP. + ldapUser, ok := claims.Lookup(ldapUserN) + if !ok { + return fmt.Errorf("Could not find LDAP username in claims") + } + + // Need to lookup the groups from LDAP. + ldapUserDN, ldapGroups, err := globalLDAPConfig.LookupUserDN(ldapUser) + if err != nil { + return fmt.Errorf("unable to query LDAP server for %s: %v", ldapUser, err) + } + + cred := auth.Credentials{ + AccessKey: stsCred.AccessKey, + SecretKey: stsCred.SecretKey, + Expiration: time.Unix(expiry, 0).UTC(), + SessionToken: stsCred.SessionToken, + Status: auth.AccountOn, + ParentUser: ldapUserDN, + Groups: ldapGroups, + } + + // Set these credentials to IAM. + if err := globalIAMSys.SetTempUser(cred.AccessKey, cred, ""); err != nil { + return fmt.Errorf("unable to save STS credential: %v", err) + } + + // Notify in-cluster peers to reload temp users. + for _, nerr := range globalNotificationSys.LoadUser(cred.AccessKey, true) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + return nil +} + +// BucketMetaHook - called when bucket meta changes happen and need to be +// replicated to peer clusters. +func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRBucketMeta) error { + // The change has already been applied to the local cluster at this + // point, and only needs to be updated on all remote peer clusters. + + c.RLock() + defer c.RUnlock() + if !c.enabled { + return nil + } + + cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { + admClient, err := c.getAdminClient(ctx, d) + if err != nil { + return wrapSRErr(err) + } + + err = admClient.SRInternalReplicateBucketMeta(ctx, item) + logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRInternalReplicateBucketMeta", err)) + return err + }) + return cErr.summaryErr +} + +// PeerBucketPolicyHandler - copies/deletes policy to local cluster. +func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *policy.Policy) error { + if policy != nil { + configData, err := json.Marshal(policy) + if err != nil { + return wrapSRErr(err) + } + + err = globalBucketMetadataSys.Update(bucket, bucketPolicyConfig, configData) + if err != nil { + return wrapSRErr(err) + } + return nil + } + + // Delete the bucket policy + err := globalBucketMetadataSys.Update(bucket, bucketPolicyConfig, nil) + if err != nil { + return wrapSRErr(err) + } + + return nil +} + +// PeerBucketTaggingHandler - copies/deletes tags to local cluster. +func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucket string, tags *string) error { + if tags != nil { + configData, err := base64.StdEncoding.DecodeString(*tags) + if err != nil { + return wrapSRErr(err) + } + err = globalBucketMetadataSys.Update(bucket, bucketTaggingConfig, configData) + if err != nil { + return wrapSRErr(err) + } + return nil + } + + // Delete the tags + err := globalBucketMetadataSys.Update(bucket, bucketTaggingConfig, nil) + if err != nil { + return wrapSRErr(err) + } + + return nil +} + +// PeerBucketObjectLockConfigHandler - sets object lock on local bucket. +func (c *SiteReplicationSys) PeerBucketObjectLockConfigHandler(ctx context.Context, bucket string, objectLockData *string) error { + if objectLockData != nil { + configData, err := base64.StdEncoding.DecodeString(*objectLockData) + if err != nil { + return wrapSRErr(err) + } + err = globalBucketMetadataSys.Update(bucket, objectLockConfig, configData) + if err != nil { + return wrapSRErr(err) + } + return nil + } + + return nil +} + +// PeerBucketSSEConfigHandler - copies/deletes SSE config to local cluster. +func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, bucket string, sseConfig *string) error { + if sseConfig != nil { + configData, err := base64.StdEncoding.DecodeString(*sseConfig) + if err != nil { + return wrapSRErr(err) + } + err = globalBucketMetadataSys.Update(bucket, bucketSSEConfig, configData) + if err != nil { + return wrapSRErr(err) + } + return nil + } + + // Delete sse config + err := globalBucketMetadataSys.Update(bucket, bucketSSEConfig, nil) + if err != nil { + return wrapSRErr(err) + } + return nil +} + +// getAdminClient - NOTE: ensure to take at least a read lock on SiteReplicationSys +// before calling this. +func (c *SiteReplicationSys) getAdminClient(ctx context.Context, deploymentID string) (*madmin.AdminClient, error) { + creds, err := c.getPeerCreds() + if err != nil { + return nil, err + } + + peer, ok := c.state.Peers[deploymentID] + if !ok { + return nil, errSRPeerNotFound + } + + return getAdminClient(peer.Endpoint, creds.AccessKey, creds.SecretKey) +} + +func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) { + globalIAMSys.store.rlock() + defer globalIAMSys.store.runlock() + creds, ok := globalIAMSys.iamUsersMap[c.state.ServiceAccountAccessKey] + if !ok { + return nil, errors.New("site replication service account not found!") + } + return &creds, nil +} + +// syncLocalToPeers is used when initially configuring site replication, to +// copy existing buckets, their settings, service accounts and policies to all +// new peers. +func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) SRError { + // If local has buckets, enable versioning on them, create them on peers + // and setup replication rules. + objAPI := newObjectLayerFn() + if objAPI == nil { + return errSRObjectLayerNotReady + } + buckets, err := objAPI.ListBuckets(ctx) + if err != nil { + return errSRBackendIssue(err) + } + for _, bucketInfo := range buckets { + bucket := bucketInfo.Name + + // MinIO does not store bucket location - so we just check if + // object locking is enabled. + lockConfig, err := globalBucketMetadataSys.GetObjectLockConfig(bucket) + if err != nil { + if _, ok := err.(BucketObjectLockConfigNotFound); !ok { + return errSRBackendIssue(err) + } + } + + var opts BucketOptions + if lockConfig != nil { + opts.LockEnabled = lockConfig.ObjectLockEnabled == "Enabled" + } + + // Now call the MakeBucketHook on existing bucket - this will + // create buckets and replication rules on peer clusters. + err = c.MakeBucketHook(ctx, bucket, opts) + if err != nil { + return errSRBucketConfigError(err) + } + + // Replicate bucket policy if present. + policy, err := globalPolicySys.Get(bucket) + found := true + if _, ok := err.(BucketPolicyNotFound); ok { + found = false + } else if err != nil { + return errSRBackendIssue(err) + } + if found { + policyJSON, err := json.Marshal(policy) + if err != nil { + return wrapSRErr(err) + } + err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypePolicy, + Bucket: bucket, + Policy: policyJSON, + }) + if err != nil { + return errSRBucketMetaError(err) + } + } + + // Replicate bucket tags if present. + tags, err := globalBucketMetadataSys.GetTaggingConfig(bucket) + found = true + if _, ok := err.(BucketTaggingNotFound); ok { + found = false + } else if err != nil { + return errSRBackendIssue(err) + } + if found { + tagCfg, err := xml.Marshal(tags) + if err != nil { + return wrapSRErr(err) + } + tagCfgStr := base64.StdEncoding.EncodeToString(tagCfg) + err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeTags, + Bucket: bucket, + Tags: &tagCfgStr, + }) + if err != nil { + return errSRBucketMetaError(err) + } + } + + // Replicate object-lock config if present. + objLockCfg, err := globalBucketMetadataSys.GetObjectLockConfig(bucket) + found = true + if _, ok := err.(BucketObjectLockConfigNotFound); ok { + found = false + } else if err != nil { + return errSRBackendIssue(err) + } + if found { + objLockCfgData, err := xml.Marshal(objLockCfg) + if err != nil { + return wrapSRErr(err) + } + objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData) + err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeObjectLockConfig, + Bucket: bucket, + Tags: &objLockStr, + }) + if err != nil { + return errSRBucketMetaError(err) + } + } + + // Replicate existing bucket bucket encryption settings + sseConfig, err := globalBucketMetadataSys.GetSSEConfig(bucket) + found = true + if _, ok := err.(BucketSSEConfigNotFound); ok { + found = false + } else if err != nil { + return errSRBackendIssue(err) + } + if found { + sseConfigData, err := xml.Marshal(sseConfig) + if err != nil { + return wrapSRErr(err) + } + sseConfigStr := base64.StdEncoding.EncodeToString(sseConfigData) + err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeSSEConfig, + Bucket: bucket, + SSEConfig: &sseConfigStr, + }) + if err != nil { + return errSRBucketMetaError(err) + } + } + } + + { + // Replicate IAM policies on local to all peers. + allPolicies, err := globalIAMSys.ListPolicies("") + if err != nil { + return errSRBackendIssue(err) + } + + for pname, policy := range allPolicies { + policyJSON, err := json.Marshal(policy) + if err != nil { + return wrapSRErr(err) + } + err = c.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemPolicy, + Name: pname, + Policy: policyJSON, + }) + if err != nil { + return errSRIAMError(err) + } + } + } + + { + // Replicate policy mappings on local to all peers. + userPolicyMap := make(map[string]MappedPolicy) + groupPolicyMap := make(map[string]MappedPolicy) + globalIAMSys.store.rlock() + errU := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, userPolicyMap) + errG := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, true, groupPolicyMap) + globalIAMSys.store.runlock() + if errU != nil { + return errSRBackendIssue(errU) + } + if errG != nil { + return errSRBackendIssue(errG) + } + + for user, mp := range userPolicyMap { + err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemPolicyMapping, + PolicyMapping: &madmin.SRPolicyMapping{ + UserOrGroup: user, + IsGroup: false, + Policy: mp.Policies, + }, + }) + if err != nil { + return errSRIAMError(err) + } + } + + for group, mp := range groupPolicyMap { + err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemPolicyMapping, + PolicyMapping: &madmin.SRPolicyMapping{ + UserOrGroup: group, + IsGroup: true, + Policy: mp.Policies, + }, + }) + if err != nil { + return errSRIAMError(err) + } + } + } + + { + // Check for service accounts and replicate them. Only LDAP user + // owned service accounts are supported for this operation. + serviceAccounts := make(map[string]auth.Credentials) + globalIAMSys.store.rlock() + err := globalIAMSys.store.loadUsers(ctx, svcUser, serviceAccounts) + globalIAMSys.store.runlock() + if err != nil { + return errSRBackendIssue(err) + } + for user, acc := range serviceAccounts { + claims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, acc.AccessKey) + if err != nil { + return errSRBackendIssue(err) + } + if _, isLDAPAccount := claims[ldapUserN]; !isLDAPAccount { + continue + } + _, policy, err := globalIAMSys.GetServiceAccount(ctx, acc.AccessKey) + if err != nil { + return errSRBackendIssue(err) + } + var policyJSON []byte + if policy != nil { + policyJSON, err = json.Marshal(policy) + if err != nil { + return wrapSRErr(err) + } + } + err = c.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemSvcAcc, + SvcAccChange: &madmin.SRSvcAccChange{ + Create: &madmin.SRSvcAccCreate{ + Parent: acc.ParentUser, + AccessKey: user, + SecretKey: acc.SecretKey, + Groups: acc.Groups, + Claims: claims, + SessionPolicy: json.RawMessage(policyJSON), + Status: acc.Status, + }, + }, + }) + if err != nil { + return errSRIAMError(err) + } + } + } + + return SRError{} +} + +// Concurrency helpers + +type concErr struct { + numActions int + errMap map[string]error + summaryErr error +} + +func (c concErr) Error() string { + return c.summaryErr.Error() +} + +func (c concErr) allFailed() bool { + return len(c.errMap) == c.numActions +} + +func (c *SiteReplicationSys) toErrorFromErrMap(errMap map[string]error) error { + if len(errMap) == 0 { + return nil + } + + msgs := []string{} + for d, err := range errMap { + name := c.state.Peers[d].Name + msgs = append(msgs, fmt.Sprintf("Site %s (%s): %v", name, d, err)) + } + return fmt.Errorf("Site replication error(s): %s", strings.Join(msgs, "; ")) +} + +func (c *SiteReplicationSys) newConcErr(numActions int, errMap map[string]error) concErr { + return concErr{ + numActions: numActions, + errMap: errMap, + summaryErr: c.toErrorFromErrMap(errMap), + } +} + +// concDo calls actions concurrently. selfActionFn is run for the current +// cluster and peerActionFn is run for each peer replication cluster. +func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func(deploymentID string, p madmin.PeerInfo) error) concErr { + depIDs := make([]string, 0, len(c.state.Peers)) + for d := range c.state.Peers { + depIDs = append(depIDs, d) + } + errs := make([]error, len(c.state.Peers)) + var wg sync.WaitGroup + for i := range depIDs { + wg.Add(1) + go func(i int) { + if depIDs[i] == globalDeploymentID { + if selfActionFn != nil { + errs[i] = selfActionFn() + } + } else { + errs[i] = peerActionFn(depIDs[i], c.state.Peers[depIDs[i]]) + } + wg.Done() + }(i) + } + wg.Wait() + errMap := make(map[string]error, len(c.state.Peers)) + for i, depID := range depIDs { + if errs[i] != nil { + errMap[depID] = errs[i] + } + } + numActions := len(c.state.Peers) - 1 + if selfActionFn != nil { + numActions++ + } + return c.newConcErr(numActions, errMap) +} + +func (c *SiteReplicationSys) annotateErr(annotation string, err error) error { + if err == nil { + return nil + } + return fmt.Errorf("%s: %s: %v", c.state.Name, annotation, err) +} + +func (c *SiteReplicationSys) annotatePeerErr(dstPeer string, annotation string, err error) error { + if err == nil { + return nil + } + return fmt.Errorf("%s->%s: %s: %v", c.state.Name, dstPeer, annotation, err) +} + +// Other helpers + +// newRemoteClusterHTTPTransport returns a new http configuration +// used while communicating with the remote cluster. +func newRemoteClusterHTTPTransport() *http.Transport { + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + RootCAs: globalRootCAs, + }, + } + return tr +} + +func getAdminClient(endpoint, accessKey, secretKey string) (*madmin.AdminClient, error) { + epURL, _ := url.Parse(endpoint) + client, err := madmin.New(epURL.Host, accessKey, secretKey, epURL.Scheme == "https") + if err != nil { + return nil, err + } + client.SetCustomTransport(newRemoteClusterHTTPTransport()) + return client, nil +} + +func getS3Client(pc madmin.PeerSite) (*minioClient.Client, error) { + ep, _ := url.Parse(pc.Endpoint) + return minioClient.New(ep.Host, &minioClient.Options{ + Creds: credentials.NewStaticV4(pc.AccessKey, pc.SecretKey, ""), + Secure: ep.Scheme == "https", + Transport: newRemoteClusterHTTPTransport(), + }) +} + +func getPriorityHelper(replicationConfig replication.Config) int { + maxPrio := 0 + for _, rule := range replicationConfig.Rules { + if rule.Priority > maxPrio { + maxPrio = rule.Priority + } + } + + // leave some gaps in priority numbers for flexibility + return maxPrio + 10 +} diff --git a/cmd/sts-handlers.go b/cmd/sts-handlers.go index 6cec751dc..cc225d8b7 100644 --- a/cmd/sts-handlers.go +++ b/cmd/sts-handlers.go @@ -29,6 +29,7 @@ import ( "time" "github.com/gorilla/mux" + "github.com/minio/madmin-go" "github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/config/identity/openid" xhttp "github.com/minio/minio/internal/http" @@ -651,6 +652,19 @@ func (sts *stsAPIHandlers) AssumeRoleWithLDAPIdentity(w http.ResponseWriter, r * } } + // Call hook for cluster-replication. + if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{ + Type: madmin.SRIAMItemSTSAcc, + STSCredential: &madmin.SRSTSCredential{ + AccessKey: cred.AccessKey, + SecretKey: cred.SecretKey, + SessionToken: cred.SessionToken, + }, + }); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + ldapIdentityResponse := &AssumeRoleWithLDAPResponse{ Result: LDAPIdentityResult{ Credentials: cred, diff --git a/docs/site-replication/README.md b/docs/site-replication/README.md new file mode 100644 index 000000000..fdf9a3289 --- /dev/null +++ b/docs/site-replication/README.md @@ -0,0 +1,40 @@ +# Site Replication Guide # + +This feature allows multiple independent MinIO sites (or clusters) that are using the same external IDentity Provider (IDP) to be configured as replicas. In this situation the set of replica sites are referred to as peer sites or just peers. This means that: + +- when a bucket is created/deleted at a site, it is created/deleted on the other peer sites as well +- each bucket is automatically configured with versioning enabled and to replicate its data on the corresponding bucket in each of the remaining peer sites +- bucket policies, bucket tags, bucket object-lock configuration and bucket encryption settings are also replicated to all other peers +- all IAM policies are replicated to all other peers +- all service accounts belonging to users authenticated via the external IDP are replicated to all other peers + +This feature is built on top of multi-site bucket replication feature. + +## Configuring Site Replication ## + +To configure site replication, ensure that all MinIO sites are using the same external IDP. + +1. Configure an alias in `mc` for each of the sites. For example if you have three MinIO sites, you may run: + +```shell +$ mc alias set minio1 https://minio1.example.com:9000 minio1 minio1123 +$ mc alias set minio2 https://minio2.example.com:9000 minio2 minio2123 +$ mc alias set minio3 https://minio3.example.com:9000 minio3 minio3123 +``` + +NOTE: When configuring site replication, each site except the first one is required to be empty. + +2. Add site replication configuration with: + +```shell +$ mc admin replicate add minio1 minio2 minio3 +``` + +3. Once the above command returns success, you may query site replication configuration with: + +```shell +$ mc admin replicate info minio1 +``` + +*NOTE*: +Site replication enables bucket versioning automatically for each bucket: it must not be modified by the cluster operator. diff --git a/go.mod b/go.mod index 571c71804..43797e1f2 100644 --- a/go.mod +++ b/go.mod @@ -45,10 +45,10 @@ require ( github.com/minio/csvparser v1.0.0 github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.14.0 - github.com/minio/madmin-go v1.1.6 + github.com/minio/madmin-go v1.1.7 github.com/minio/minio-go/v7 v7.0.15-0.20210928020726-a58653d41dd8 github.com/minio/parquet-go v1.0.0 - github.com/minio/pkg v1.1.3 + github.com/minio/pkg v1.1.4 github.com/minio/selfupdate v0.3.1 github.com/minio/sha256-simd v1.0.0 github.com/minio/simdjson-go v0.2.1 diff --git a/go.sum b/go.sum index b8e24aa58..d8d91db28 100644 --- a/go.sum +++ b/go.sum @@ -1019,8 +1019,9 @@ github.com/minio/kes v0.11.0/go.mod h1:mTF1Bv8YVEtQqF/B7Felp4tLee44Pp+dgI0rhCvgN github.com/minio/kes v0.14.0 h1:plCGm4LwR++T1P1sXsJbyFRX54CE1WRuo9PAPj6MC3Q= github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQcfA= github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs= -github.com/minio/madmin-go v1.1.6 h1:L53ALIbAilaEvuvMMT4XkJpd6mtaorkMBwCQ+zraYBA= github.com/minio/madmin-go v1.1.6/go.mod h1:vw+c3/u+DeVKqReEavo///Cl2OO8nt5s4ee843hJeLs= +github.com/minio/madmin-go v1.1.7 h1:vZCnIfPlb40sBap+bmwvxG4/dSfkwF8QCRUHZL16Ylg= +github.com/minio/madmin-go v1.1.7/go.mod h1:Iu0OnrMWNBYx1lqJTW+BFjBMx0Hi0wjw8VmqhiOs2Jo= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f h1:hyFvo5hSFw2K417YvDr/vAKlgCG69uTuhZW/5LNdL0U= github.com/minio/mc v0.0.0-20210626002108-cebf3318546f/go.mod h1:tuaonkPjVApCXkbtKENHBtsqUf7YTV33qmFrC+Pgp5g= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= @@ -1041,8 +1042,9 @@ github.com/minio/parquet-go v1.0.0/go.mod h1:aQlkSOfOq2AtQKkuou3mosNVMwNokd+faTa github.com/minio/pkg v1.0.3/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8= github.com/minio/pkg v1.0.4/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8= github.com/minio/pkg v1.0.8/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14= -github.com/minio/pkg v1.1.3 h1:J4vGnlNSxc/o9gDOQMZ3k0L3koA7ZgBQ7GRMrUpt/OY= github.com/minio/pkg v1.1.3/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14= +github.com/minio/pkg v1.1.4 h1:VxdPYpXAi1xseavq2WAJLr9BaKn5OP0TC1/2R4Ad9Oc= +github.com/minio/pkg v1.1.4/go.mod h1:32x/3OmGB0EOi1N+3ggnp+B5VFkSBBB9svPMVfpnf14= github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs= github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=