Add support for server side bucket replication (#9882)

This commit is contained in:
poornas
2020-07-21 17:49:56 -07:00
committed by GitHub
parent ca4c15bc63
commit c43da3005a
44 changed files with 2387 additions and 322 deletions

View File

@@ -0,0 +1,249 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/env"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
)
const (
bucketQuotaConfigFile = "quota.json"
bucketReplicationTargetsFile = "replication-targets.json"
)
// PutBucketQuotaConfigHandler - PUT Bucket quota configuration.
// ----------
// Places a quota configuration on the specified bucket. The quota
// specified in the quota configuration will be applied by default
// to enforce total quota for the specified bucket.
func (a adminAPIHandlers) PutBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutBucketQuotaConfig")
defer logger.AuditLog(w, r, "PutBucketQuotaConfig", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SetBucketQuotaAdminAction)
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
// Turn off quota commands if data usage info is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminBucketQuotaDisabled), r.URL)
return
}
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
return
}
if _, err = parseBucketQuota(bucket, data); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if err = globalBucketMetadataSys.Update(bucket, bucketQuotaConfigFile, data); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// GetBucketQuotaConfigHandler - gets bucket quota configuration
func (a adminAPIHandlers) GetBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketQuotaConfig")
defer logger.AuditLog(w, r, "GetBucketQuotaConfig", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketQuotaAdminAction)
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
config, err := globalBucketMetadataSys.GetQuotaConfig(bucket)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
configData, err := json.Marshal(config)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// Write success response.
writeSuccessResponseJSON(w, configData)
}
// SetBucketReplicationTargetHandler - sets a replication target for bucket
func (a adminAPIHandlers) SetBucketReplicationTargetHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "SetBucketReplicationTarget")
defer logger.AuditLog(w, r, "SetBucketReplicationTarget", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
// Get current object layer instance.
objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.SetBucketReplicationTargetAction)
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
// Turn off replication if disk crawl is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketReplicationDisabledError), r.URL)
return
}
// Check if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if versioned := globalBucketVersioningSys.Enabled(bucket); !versioned {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrReplicationBucketNeedsVersioningError), r.URL)
return
}
cred, _, _, s3Err := validateAdminSignature(ctx, r, "")
if s3Err != ErrNone {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
password := cred.SecretKey
reqBytes, err := madmin.DecryptData(password, io.LimitReader(r.Body, r.ContentLength))
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL)
return
}
var target madmin.BucketReplicationTarget
if err = json.Unmarshal(reqBytes, &target); err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL)
return
}
target.Arn = globalBucketReplicationSys.getReplicationARN(target.URL())
tgtBytes, err := json.Marshal(&target)
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL)
return
}
if err = globalBucketMetadataSys.Update(bucket, bucketReplicationTargetsFile, tgtBytes); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if err = globalBucketReplicationSys.SetTarget(ctx, bucket, &target); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// GetBucketReplicationTargetsHandler - gets bucket replication targets for a particular bucket
func (a adminAPIHandlers) GetBucketReplicationTargetsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketReplicationTarget")
defer logger.AuditLog(w, r, "GetBucketReplicationTarget", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
// Get current object layer instance.
objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketReplicationTargetAction)
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
target, err := globalBucketMetadataSys.GetReplicationTargetConfig(bucket)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// remove secretKey from creds
var tgt madmin.BucketReplicationTarget
if !target.Empty() {
var creds auth.Credentials
creds.AccessKey = target.Credentials.AccessKey
tgt = madmin.BucketReplicationTarget{Endpoint: target.Endpoint, TargetBucket: target.TargetBucket, Credentials: &creds}
}
data, err := json.Marshal(tgt)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// Write success response.
writeSuccessResponseJSON(w, data)
}
// GetBucketReplicationARNHandler - gets replication ARN for a particular remote
func (a adminAPIHandlers) GetBucketReplicationARNHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketReplicationARN")
defer logger.AuditLog(w, r, "GetBucketReplicationARN", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
rURL := vars["url"]
// Get current object layer instance.
objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketReplicationTargetAction)
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
data, err := json.Marshal(globalBucketReplicationSys.getARN(rURL))
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// Write success response.
writeSuccessResponseJSON(w, data)
}

View File

@@ -1,118 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/json"
"io/ioutil"
"net/http"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/env"
iampolicy "github.com/minio/minio/pkg/iam/policy"
)
const (
bucketQuotaConfigFile = "quota.json"
)
// PutBucketQuotaConfigHandler - PUT Bucket quota configuration.
// ----------
// Places a quota configuration on the specified bucket. The quota
// specified in the quota configuration will be applied by default
// to enforce total quota for the specified bucket.
func (a adminAPIHandlers) PutBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutBucketQuotaConfig")
defer logger.AuditLog(w, r, "PutBucketQuotaConfig", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SetBucketQuotaAdminAction)
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
// Turn off quota commands if data usage info is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminBucketQuotaDisabled), r.URL)
return
}
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
return
}
if _, err = parseBucketQuota(bucket, data); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if err = globalBucketMetadataSys.Update(bucket, bucketQuotaConfigFile, data); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// GetBucketQuotaConfigHandler - gets bucket quota configuration
func (a adminAPIHandlers) GetBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketQuotaConfig")
defer logger.AuditLog(w, r, "GetBucketQuotaConfig", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketQuotaAdminAction)
if objectAPI == nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
config, err := globalBucketMetadataSys.GetQuotaConfig(bucket)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
configData, err := json.Marshal(config)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// Write success response.
writeSuccessResponseJSON(w, configData)
}

View File

@@ -171,8 +171,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-group-status").HandlerFunc(httpTraceHdrs(adminAPI.SetGroupStatus)).Queries("group", "{group:.*}").Queries("status", "{status:.*}")
}
// Quota operations
if globalIsDistErasure || globalIsErasure {
// Quota operations
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
// GetBucketQuotaConfig
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-quota").HandlerFunc(
@@ -181,6 +181,16 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-quota").HandlerFunc(
httpTraceHdrs(adminAPI.PutBucketQuotaConfigHandler)).Queries("bucket", "{bucket:.*}")
}
// Bucket replication operations
// GetBucketReplicationTargetHandler
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-replication-target").HandlerFunc(
httpTraceHdrs(adminAPI.GetBucketReplicationTargetsHandler)).Queries("bucket", "{bucket:.*}")
// GetBucketReplicationARN Handler
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-replication-arn").HandlerFunc(
httpTraceHdrs(adminAPI.GetBucketReplicationARNHandler)).Queries("url", "{url:.*}")
// SetBucketReplicationTargetHandler
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-replication-target").HandlerFunc(
httpTraceHdrs(adminAPI.SetBucketReplicationTargetHandler)).Queries("bucket", "{bucket:.*}")
}
// -- Top APIs --

View File

@@ -34,6 +34,8 @@ import (
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/bucket/replication"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/versioning"
@@ -104,6 +106,12 @@ const (
ErrNoSuchCORSConfiguration
ErrNoSuchWebsiteConfiguration
ErrReplicationConfigurationNotFoundError
ErrReplicationDestinationNotFoundError
ErrReplicationTargetNotFoundError
ErrReplicationNeedsVersioningError
ErrReplicationBucketNeedsVersioningError
ErrBucketReplicationDisabledError
ErrNoSuchKey
ErrNoSuchUpload
ErrNoSuchVersion
@@ -812,6 +820,31 @@ var errorCodes = errorCodeMap{
Description: "The replication configuration was not found",
HTTPStatusCode: http.StatusNotFound,
},
ErrReplicationDestinationNotFoundError: {
Code: "ReplicationDestinationNotFoundError",
Description: "The replication destination bucket does not exist",
HTTPStatusCode: http.StatusNotFound,
},
ErrReplicationTargetNotFoundError: {
Code: "ReplicationTargetNotFoundError",
Description: "The replication target does not exist",
HTTPStatusCode: http.StatusNotFound,
},
ErrReplicationNeedsVersioningError: {
Code: "InvalidRequest",
Description: "Versioning must be 'Enabled' on the bucket to apply a replication configuration",
HTTPStatusCode: http.StatusBadRequest,
},
ErrReplicationBucketNeedsVersioningError: {
Code: "InvalidRequest",
Description: "Versioning must be 'Enabled' on the bucket to add a replication target",
HTTPStatusCode: http.StatusBadRequest,
},
ErrBucketReplicationDisabledError: {
Code: "XMinioAdminBucketReplicationDisabled",
Description: "Replication specified but disk usage crawl is disabled on MinIO server",
HTTPStatusCode: http.StatusBadRequest,
},
ErrNoSuchObjectLockConfiguration: {
Code: "NoSuchObjectLockConfiguration",
Description: "The specified object does not have a ObjectLock configuration",
@@ -1837,6 +1870,12 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrObjectLockConfigurationNotFound
case BucketQuotaConfigNotFound:
apiErr = ErrAdminNoSuchQuotaConfiguration
case BucketReplicationConfigNotFound:
apiErr = ErrReplicationConfigurationNotFoundError
case BucketReplicationDestinationNotFound:
apiErr = ErrReplicationDestinationNotFoundError
case BucketReplicationTargetNotFound:
apiErr = ErrReplicationTargetNotFoundError
case BucketQuotaExceeded:
apiErr = ErrAdminBucketQuotaExceeded
case *event.ErrInvalidEventName:
@@ -1941,6 +1980,12 @@ func toAPIError(ctx context.Context, err error) APIError {
Description: e.Error(),
HTTPStatusCode: http.StatusBadRequest,
}
case replication.Error:
apiErr = APIError{
Code: "MalformedXML",
Description: e.Error(),
HTTPStatusCode: http.StatusBadRequest,
}
case tags.Error:
apiErr = APIError{
Code: e.Code(),

View File

@@ -157,7 +157,9 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp
if objInfo.VersionID != "" {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
}
if objInfo.ReplicationStatus.String() != "" {
w.Header()[xhttp.AmzBucketReplicationStatus] = []string{objInfo.ReplicationStatus.String()}
}
if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil {
ruleID, expiryTime := lc.PredictExpiryTime(lifecycle.ObjectOpts{
Name: objInfo.Name,

View File

@@ -166,6 +166,21 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
// GetBucketEncryption
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketencryption", httpTraceAll(api.GetBucketEncryptionHandler)))).Queries("encryption", "")
// GetBucketObjectLockConfig
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketobjectlockconfiguration", httpTraceAll(api.GetBucketObjectLockConfigHandler)))).Queries("object-lock", "")
// GetBucketReplicationConfig
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketreplicationconfiguration", httpTraceAll(api.GetBucketReplicationConfigHandler)))).Queries("replication", "")
// GetBucketVersioning
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketversioning", httpTraceAll(api.GetBucketVersioningHandler)))).Queries("versioning", "")
// GetBucketNotification
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler)))).Queries("notification", "")
// ListenNotification
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listennotification", httpTraceAll(api.ListenNotificationHandler))).Queries("events", "{events:.*}")
// Dummy Bucket Calls
// GetBucketACL -- this is a dummy call.
@@ -192,9 +207,6 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
// GetBucketLifecycleHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler)))).Queries("lifecycle", "")
// GetBucketReplicationHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketreplication", httpTraceAll(api.GetBucketReplicationHandler)))).Queries("replication", "")
// GetBucketTaggingHandler
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbuckettagging", httpTraceAll(api.GetBucketTaggingHandler)))).Queries("tagging", "")
@@ -205,17 +217,6 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebuckettagging", httpTraceAll(api.DeleteBucketTaggingHandler)))).Queries("tagging", "")
// GetBucketObjectLockConfig
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketobjectlockconfiguration", httpTraceAll(api.GetBucketObjectLockConfigHandler)))).Queries("object-lock", "")
// GetBucketVersioning
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketversioning", httpTraceAll(api.GetBucketVersioningHandler)))).Queries("versioning", "")
// GetBucketNotification
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler)))).Queries("notification", "")
// ListenNotification
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listennotification", httpTraceAll(api.ListenNotificationHandler))).Queries("events", "{events:.*}")
// ListMultipartUploads
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler)))).Queries("uploads", "")
@@ -234,6 +235,11 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
// PutBucketLifecycle
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketlifecycle", httpTraceAll(api.PutBucketLifecycleHandler)))).Queries("lifecycle", "")
// PutBucketReplicationConfig
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketreplicationconfiguration", httpTraceAll(api.PutBucketReplicationConfigHandler)))).Queries("replication", "")
// GetObjectRetention
// PutBucketEncryption
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketencryption", httpTraceAll(api.PutBucketEncryptionHandler)))).Queries("encryption", "")
@@ -269,6 +275,9 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
// DeleteBucketPolicy
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketpolicy", httpTraceAll(api.DeleteBucketPolicyHandler)))).Queries("policy", "")
// DeleteBucketReplication
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketreplicationconfiguration", httpTraceAll(api.DeleteBucketReplicationConfigHandler)))).Queries("replication", "")
// DeleteBucketLifecycle
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketlifecycle", httpTraceAll(api.DeleteBucketLifecycleHandler)))).Queries("lifecycle", "")

View File

@@ -32,12 +32,15 @@ import (
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/config/etcd/dns"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/hash"
@@ -46,8 +49,9 @@ import (
)
const (
objectLockConfig = "object-lock.xml"
bucketTaggingConfig = "tagging.xml"
objectLockConfig = "object-lock.xml"
bucketTaggingConfig = "tagging.xml"
bucketReplicationConfig = "replication.xml"
)
// Check if there are buckets on server without corresponding entry in etcd backend and
@@ -1219,3 +1223,143 @@ func (api objectAPIHandlers) DeleteBucketTaggingHandler(w http.ResponseWriter, r
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// PutBucketReplicationConfigHandler - PUT Bucket replication configuration.
// ----------
// Add a replication configuration on the specified bucket as specified in https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketReplication.html
func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutBucketReplicationConfig")
defer logger.AuditLog(w, r, "PutBucketReplicationConfig", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// Turn off replication if disk crawl is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketReplicationDisabledError), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.PutReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
// Check if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if versioned := globalBucketVersioningSys.Enabled(bucket); !versioned {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNeedsVersioningError), r.URL, guessIsBrowserReq(r))
return
}
replicationConfig, err := replication.ParseConfig(io.LimitReader(r.Body, r.ContentLength))
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
apiErr.Description = err.Error()
writeErrorResponse(ctx, w, apiErr, r.URL, guessIsBrowserReq(r))
return
}
sameTarget, err := globalBucketReplicationSys.validateDestination(ctx, bucket, replicationConfig)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Validate the received bucket replication config
if err = replicationConfig.Validate(bucket, sameTarget); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
configData, err := xml.Marshal(replicationConfig)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if err = globalBucketMetadataSys.Update(bucket, bucketReplicationConfig, configData); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// GetBucketReplicationConfigHandler - GET Bucket replication configuration.
// ----------
// Gets the replication configuration for a bucket.
func (api objectAPIHandlers) GetBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketReplicationConfig")
defer logger.AuditLog(w, r, "GetBucketReplicationConfig", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// check if user has permissions to perform this operation
if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
// Check if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
config, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
configData, err := xml.Marshal(config)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Write success response.
writeSuccessResponseXML(w, configData)
}
// DeleteBucketReplicationConfigHandler - DELETE Bucket replication config.
// ----------
func (api objectAPIHandlers) DeleteBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "DeleteBucketReplicationConfig")
defer logger.AuditLog(w, r, "DeleteBucketReplicationConfig", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.PutReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
// Check if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if err := globalBucketMetadataSys.Update(bucket, bucketReplicationConfig, nil); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/minio/minio/pkg/bucket/lifecycle"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/bucket/versioning"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
@@ -153,6 +154,10 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
meta.VersioningConfigXML = configData
case bucketQuotaConfigFile:
meta.QuotaConfigJSON = configData
case bucketReplicationConfig:
meta.ReplicationConfigXML = configData
case bucketReplicationTargetsFile:
meta.ReplicationTargetsConfigJSON = configData
default:
return fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile)
}
@@ -318,7 +323,37 @@ func (sys *BucketMetadataSys) GetQuotaConfig(bucket string) (*madmin.BucketQuota
return meta.quotaConfig, nil
}
// GetConfig returns the current bucket metadata
// GetReplicationConfig returns configured bucket replication config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket string) (*replication.Config, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketReplicationConfigNotFound{Bucket: bucket}
}
return nil, err
}
if meta.replicationConfig == nil {
return nil, BucketReplicationConfigNotFound{Bucket: bucket}
}
return meta.replicationConfig, nil
}
// GetReplicationTargetConfig returns configured bucket replication target for this bucket
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetReplicationTargetConfig(bucket string) (*madmin.BucketReplicationTarget, error) {
meta, err := sys.GetConfig(bucket)
if err != nil {
return nil, err
}
if meta.replicationTargetConfig == nil {
return nil, BucketReplicationTargetNotFound{Bucket: bucket}
}
return meta.replicationTargetConfig, nil
}
// GetConfig returns a specific configuration from the bucket metadata.
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetConfig(bucket string) (BucketMetadata, error) {
objAPI := newObjectLayerWithoutSafeModeFn()

View File

@@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
@@ -32,6 +33,7 @@ import (
"github.com/minio/minio/pkg/bucket/lifecycle"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/bucket/versioning"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
@@ -59,27 +61,31 @@ var (
// bucketMetadataFormat refers to the format.
// bucketMetadataVersion can be used to track a rolling upgrade of a field.
type BucketMetadata struct {
Name string
Created time.Time
LockEnabled bool // legacy not used anymore.
PolicyConfigJSON []byte
NotificationConfigXML []byte
LifecycleConfigXML []byte
ObjectLockConfigXML []byte
VersioningConfigXML []byte
EncryptionConfigXML []byte
TaggingConfigXML []byte
QuotaConfigJSON []byte
Name string
Created time.Time
LockEnabled bool // legacy not used anymore.
PolicyConfigJSON []byte
NotificationConfigXML []byte
LifecycleConfigXML []byte
ObjectLockConfigXML []byte
VersioningConfigXML []byte
EncryptionConfigXML []byte
TaggingConfigXML []byte
QuotaConfigJSON []byte
ReplicationConfigXML []byte
ReplicationTargetsConfigJSON []byte
// Unexported fields. Must be updated atomically.
policyConfig *policy.Policy
notificationConfig *event.Config
lifecycleConfig *lifecycle.Lifecycle
objectLockConfig *objectlock.Config
versioningConfig *versioning.Versioning
sseConfig *bucketsse.BucketSSEConfig
taggingConfig *tags.Tags
quotaConfig *madmin.BucketQuota
policyConfig *policy.Policy
notificationConfig *event.Config
lifecycleConfig *lifecycle.Lifecycle
objectLockConfig *objectlock.Config
versioningConfig *versioning.Versioning
sseConfig *bucketsse.BucketSSEConfig
taggingConfig *tags.Tags
quotaConfig *madmin.BucketQuota
replicationConfig *replication.Config
replicationTargetConfig *madmin.BucketReplicationTarget
}
// newBucketMetadata creates BucketMetadata with the supplied name and Created to Now.
@@ -94,6 +100,7 @@ func newBucketMetadata(name string) BucketMetadata {
versioningConfig: &versioning.Versioning{
XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",
},
replicationTargetConfig: &madmin.BucketReplicationTarget{},
}
}
@@ -119,7 +126,6 @@ func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string)
default:
return fmt.Errorf("loadBucketMetadata: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
_, err = b.UnmarshalMsg(data[4:])
return err
@@ -136,7 +142,6 @@ func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket strin
if err != errConfigNotFound {
return b, err
}
// Old bucket without bucket metadata. Hence we migrate existing settings.
return b, b.convertLegacyConfigs(ctx, objectAPI)
}
@@ -213,6 +218,22 @@ func (b *BucketMetadata) parseAllConfigs(ctx context.Context, objectAPI ObjectLa
}
}
if len(b.ReplicationConfigXML) != 0 {
b.replicationConfig, err = replication.ParseConfig(bytes.NewReader(b.ReplicationConfigXML))
if err != nil {
return err
}
} else {
b.replicationConfig = nil
}
if len(b.ReplicationTargetsConfigJSON) != 0 {
if err = json.Unmarshal(b.ReplicationTargetsConfigJSON, b.replicationTargetConfig); err != nil {
return err
}
} else {
b.replicationTargetConfig = &madmin.BucketReplicationTarget{}
}
return nil
}
@@ -225,6 +246,8 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
bucketQuotaConfigFile,
bucketSSEConfig,
bucketTaggingConfig,
bucketReplicationConfig,
bucketReplicationTargetsFile,
objectLockConfig,
}
@@ -281,6 +304,10 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
b.VersioningConfigXML = enabledBucketVersioningConfig
case bucketQuotaConfigFile:
b.QuotaConfigJSON = configData
case bucketReplicationConfig:
b.ReplicationConfigXML = configData
case bucketReplicationTargetsFile:
b.ReplicationTargetsConfigJSON = configData
}
}
@@ -315,7 +342,6 @@ func (b *BucketMetadata) Save(ctx context.Context, api ObjectLayer) error {
if err != nil {
return err
}
configFile := path.Join(bucketConfigPrefix, b.Name, bucketMetadataFile)
return saveConfig(ctx, api, configFile, data)
}

View File

@@ -90,6 +90,18 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "QuotaConfigJSON")
return
}
case "ReplicationConfigXML":
z.ReplicationConfigXML, err = dc.ReadBytes(z.ReplicationConfigXML)
if err != nil {
err = msgp.WrapError(err, "ReplicationConfigXML")
return
}
case "ReplicationTargetsConfigJSON":
z.ReplicationTargetsConfigJSON, err = dc.ReadBytes(z.ReplicationTargetsConfigJSON)
if err != nil {
err = msgp.WrapError(err, "ReplicationTargetsConfigJSON")
return
}
default:
err = dc.Skip()
if err != nil {
@@ -103,9 +115,9 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 11
// map header, size 13
// write "Name"
err = en.Append(0x8b, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
err = en.Append(0x8d, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
if err != nil {
return
}
@@ -214,15 +226,35 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "QuotaConfigJSON")
return
}
// write "ReplicationConfigXML"
err = en.Append(0xb4, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
if err != nil {
return
}
err = en.WriteBytes(z.ReplicationConfigXML)
if err != nil {
err = msgp.WrapError(err, "ReplicationConfigXML")
return
}
// write "ReplicationTargetsConfigJSON"
err = en.Append(0xbc, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4a, 0x53, 0x4f, 0x4e)
if err != nil {
return
}
err = en.WriteBytes(z.ReplicationTargetsConfigJSON)
if err != nil {
err = msgp.WrapError(err, "ReplicationTargetsConfigJSON")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 11
// map header, size 13
// string "Name"
o = append(o, 0x8b, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = append(o, 0x8d, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, z.Name)
// string "Created"
o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64)
@@ -254,6 +286,12 @@ func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
// string "QuotaConfigJSON"
o = append(o, 0xaf, 0x51, 0x75, 0x6f, 0x74, 0x61, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4a, 0x53, 0x4f, 0x4e)
o = msgp.AppendBytes(o, z.QuotaConfigJSON)
// string "ReplicationConfigXML"
o = append(o, 0xb4, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
o = msgp.AppendBytes(o, z.ReplicationConfigXML)
// string "ReplicationTargetsConfigJSON"
o = append(o, 0xbc, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4a, 0x53, 0x4f, 0x4e)
o = msgp.AppendBytes(o, z.ReplicationTargetsConfigJSON)
return
}
@@ -341,6 +379,18 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "QuotaConfigJSON")
return
}
case "ReplicationConfigXML":
z.ReplicationConfigXML, bts, err = msgp.ReadBytesBytes(bts, z.ReplicationConfigXML)
if err != nil {
err = msgp.WrapError(err, "ReplicationConfigXML")
return
}
case "ReplicationTargetsConfigJSON":
z.ReplicationTargetsConfigJSON, bts, err = msgp.ReadBytesBytes(bts, z.ReplicationTargetsConfigJSON)
if err != nil {
err = msgp.WrapError(err, "ReplicationTargetsConfigJSON")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@@ -355,6 +405,6 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketMetadata) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON)
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 29 + msgp.BytesPrefixSize + len(z.ReplicationTargetsConfigJSON)
return
}

350
cmd/bucket-replication.go Normal file
View File

@@ -0,0 +1,350 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/event"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
)
// BucketReplicationSys represents replication subsystem
type BucketReplicationSys struct {
sync.RWMutex
targetsMap map[string]*miniogo.Core
targetsARNMap map[string]string
}
// GetConfig - gets replication config associated to a given bucket name.
func (sys *BucketReplicationSys) GetConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
if globalIsGateway {
objAPI := newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
return nil, errServerNotInitialized
}
return nil, BucketReplicationConfigNotFound{Bucket: bucketName}
}
return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
}
// SetTarget - sets a new minio-go client replication target for this bucket.
func (sys *BucketReplicationSys) SetTarget(ctx context.Context, bucket string, tgt *madmin.BucketReplicationTarget) error {
if globalIsGateway {
return nil
}
// delete replication targets that were removed
if tgt.Empty() {
sys.Lock()
if currTgt, ok := sys.targetsMap[bucket]; ok {
delete(sys.targetsARNMap, currTgt.EndpointURL().String())
}
delete(sys.targetsMap, bucket)
sys.Unlock()
return nil
}
clnt, err := getReplicationTargetClient(tgt)
if err != nil {
return BucketReplicationTargetNotFound{Bucket: tgt.TargetBucket}
}
ok, err := clnt.BucketExists(ctx, tgt.TargetBucket)
if err != nil {
return err
}
if !ok {
return BucketReplicationDestinationNotFound{Bucket: tgt.TargetBucket}
}
sys.Lock()
sys.targetsMap[bucket] = clnt
sys.targetsARNMap[tgt.URL()] = tgt.Arn
sys.Unlock()
return nil
}
// GetTargetClient returns minio-go client for target instance
func (sys *BucketReplicationSys) GetTargetClient(ctx context.Context, bucket string) *miniogo.Core {
var clnt *miniogo.Core
sys.RLock()
if c, ok := sys.targetsMap[bucket]; ok {
clnt = c
}
sys.RUnlock()
return clnt
}
// validateDestination returns error if replication destination bucket missing or not configured
// It also returns true if replication destination is same as this server.
func (sys *BucketReplicationSys) validateDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
clnt := sys.GetTargetClient(ctx, bucket)
if clnt == nil {
return false, BucketReplicationTargetNotFound{Bucket: bucket}
}
if found, _ := clnt.BucketExists(ctx, rCfg.GetDestination().Bucket); !found {
return false, BucketReplicationDestinationNotFound{Bucket: rCfg.GetDestination().Bucket}
}
// validate replication ARN against target endpoint
for k, v := range sys.targetsARNMap {
if v == rCfg.ReplicationArn {
if k == clnt.EndpointURL().String() {
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
return sameTarget, nil
}
}
}
return false, BucketReplicationTargetNotFound{Bucket: bucket}
}
// NewBucketReplicationSys - creates new replication system.
func NewBucketReplicationSys() *BucketReplicationSys {
return &BucketReplicationSys{
targetsMap: make(map[string]*miniogo.Core),
targetsARNMap: make(map[string]string),
}
}
// Init initializes the bucket replication subsystem for buckets with replication config
func (sys *BucketReplicationSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, replication is not supported.
if globalIsGateway {
return nil
}
// Load bucket replication targets once during boot.
sys.load(ctx, buckets, objAPI)
return nil
}
// create minio-go clients for buckets having replication targets
func (sys *BucketReplicationSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
for _, bucket := range buckets {
tgt, err := globalBucketMetadataSys.GetReplicationTargetConfig(bucket.Name)
if err != nil {
continue
}
if tgt == nil || tgt.Empty() {
continue
}
tgtClient, err := getReplicationTargetClient(tgt)
if err != nil {
continue
}
sys.Lock()
sys.targetsMap[bucket.Name] = tgtClient
sys.targetsARNMap[tgt.URL()] = tgt.Arn
sys.Unlock()
}
}
// GetARN returns the ARN associated with replication target URL
func (sys *BucketReplicationSys) getARN(endpoint string) string {
return sys.targetsARNMap[endpoint]
}
// getReplicationTargetInstanceTransport contains a singleton roundtripper.
var getReplicationTargetInstanceTransport http.RoundTripper
var getReplicationTargetInstanceTransportOnce sync.Once
// Returns a minio-go Client configured to access remote host described in replication target config.
var getReplicationTargetClient = func(tcfg *madmin.BucketReplicationTarget) (*miniogo.Core, error) {
config := tcfg.Credentials
// if Signature version '4' use NewV4 directly.
creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "")
// if Signature version '2' use NewV2 directly.
if strings.ToUpper(tcfg.API) == "S3V2" {
creds = credentials.NewStaticV2(config.AccessKey, config.SecretKey, "")
}
getReplicationTargetInstanceTransportOnce.Do(func() {
getReplicationTargetInstanceTransport = NewGatewayHTTPTransport()
})
core, err := miniogo.NewCore(tcfg.Endpoint, &miniogo.Options{
Creds: creds,
Secure: tcfg.IsSSL,
Transport: getReplicationTargetInstanceTransport,
})
return core, err
}
// mustReplicate returns true if object meets replication criteria.
func (sys *BucketReplicationSys) mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
if globalIsGateway {
return false
}
if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok {
replStatus = rs
}
if replication.StatusType(replStatus) == replication.Replica {
return false
}
if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
return false
}
cfg, err := globalBucketReplicationSys.GetConfig(ctx, bucket)
if err != nil {
return false
}
opts := replication.ObjectOpts{
Name: object,
SSEC: crypto.SSEC.IsEncrypted(meta),
}
tagStr, ok := meta[xhttp.AmzObjectTagging]
if ok {
opts.UserTags = tagStr
}
return cfg.Replicate(opts)
}
func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
meta := make(map[string]string)
for k, v := range objInfo.UserDefined {
if k == xhttp.AmzBucketReplicationStatus {
continue
}
meta[k] = v
}
tag, err := tags.ParseObjectTags(objInfo.UserTags)
if err != nil {
return
}
putOpts = miniogo.PutObjectOptions{
UserMetadata: meta,
UserTags: tag.ToMap(),
ContentType: objInfo.ContentType,
ContentEncoding: objInfo.ContentEncoding,
StorageClass: dest.StorageClass,
ReplicationVersionID: objInfo.VersionID,
ReplicationStatus: miniogo.ReplicationStatusReplica,
ReplicationMTime: objInfo.ModTime,
}
if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok {
rmode := miniogo.RetentionMode(mode)
putOpts.Mode = rmode
}
if retainDateStr, ok := objInfo.UserDefined[xhttp.AmzObjectLockRetainUntilDate]; ok {
rdate, err := time.Parse(time.RFC3339, retainDateStr)
if err != nil {
return
}
putOpts.RetainUntilDate = rdate
}
if lhold, ok := objInfo.UserDefined[xhttp.AmzObjectLockLegalHold]; ok {
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
}
if crypto.S3.IsEncrypted(objInfo.UserDefined) {
putOpts.ServerSideEncryption = encrypt.NewSSE()
}
return
}
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, bucket, object, versionID string, objectAPI ObjectLayer, eventArg *eventArgs, healPending bool) {
cfg, err := globalBucketReplicationSys.GetConfig(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return
}
tgt := globalBucketReplicationSys.GetTargetClient(ctx, bucket)
if tgt == nil {
return
}
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
if err != nil {
return
}
defer gr.Close()
objInfo := gr.ObjInfo
size, err := objInfo.GetActualSize()
if err != nil {
logger.LogIf(ctx, err)
return
}
dest := cfg.GetDestination()
if dest.Bucket == "" {
return
}
// In the rare event that replication is in pending state either due to
// server shut down/crash before replication completed or healing and PutObject
// race - do an additional stat to see if the version ID exists
if healPending {
_, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID})
if err == nil {
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed.
return
}
}
putOpts := putReplicationOpts(dest, objInfo)
replicationStatus := replication.Complete
_, err = tgt.PutObject(ctx, dest.Bucket, object, gr, size, "", "", putOpts)
if err != nil {
replicationStatus = replication.Failed
// Notify replication failure event.
if eventArg == nil {
eventArg = &eventArgs{
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
}
}
eventArg.EventName = event.OperationReplicationFailed
eventArg.Object.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
sendEvent(*eventArg)
}
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
if objInfo.UserTags != "" {
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
objInfo.metadataOnly = true // Perform only metadata updates.
if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{
VersionID: objInfo.VersionID,
}, ObjectOptions{VersionID: objInfo.VersionID}); err != nil {
logger.LogIf(ctx, err)
}
}
// getReplicationARN gets existing ARN for an endpoint or generates a new one.
func (sys *BucketReplicationSys) getReplicationARN(endpoint string) string {
arn, ok := sys.targetsARNMap[endpoint]
if ok {
return arn
}
return fmt.Sprintf("arn:minio:s3::%s:*", mustGetUUID())
}

View File

@@ -30,6 +30,7 @@ import (
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/event"
@@ -314,7 +315,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
filter = nil
}
}
if _, ok := f.oldCache.Cache[thisHash.Key()]; filter != nil && ok {
// If folder isn't in filter and we have data, skip it completely.
if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
@@ -637,3 +637,14 @@ func sleepDuration(d time.Duration, x float64) {
time.Sleep(d)
}
}
// healReplication will heal a scanned item that has failed replication.
func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta) {
if meta.oi.ReplicationStatus == replication.Pending ||
meta.oi.ReplicationStatus == replication.Failed {
// if heal encounters a pending replication status, either replication
// has failed due to server shutdown or crawler and PutObject replication are in contention.
healPending := meta.oi.ReplicationStatus == replication.Pending
replicateObject(ctx, meta.oi.Bucket, meta.oi.Name, meta.oi.VersionID, o, nil, healPending)
}
}

View File

@@ -160,38 +160,6 @@ func (api objectAPIHandlers) GetBucketLoggingHandler(w http.ResponseWriter, r *h
writeSuccessResponseXML(w, []byte(loggingDefaultConfig))
}
// GetBucketReplicationHandler - GET bucket replication, a dummy api
func (api objectAPIHandlers) GetBucketReplicationHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketReplication")
defer logger.AuditLog(w, r, "GetBucketReplication", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objAPI := api.ObjectAPI()
if objAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// Allow getBucketCors if policy action is set, since this is a dummy call
// we are simply re-purposing the bucketPolicyAction.
if s3Error := checkRequestAuthType(ctx, r, policy.GetBucketPolicyAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
// Validate if bucket exists, before proceeding further...
_, err := objAPI.GetBucketInfo(ctx, bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationConfigurationNotFoundError), r.URL, guessIsBrowserReq(r))
}
// DeleteBucketWebsiteHandler - DELETE bucket website, a dummy api
func (api objectAPIHandlers) DeleteBucketWebsiteHandler(w http.ResponseWriter, r *http.Request) {
writeSuccessResponseHeadersOnly(w)

View File

@@ -26,6 +26,7 @@ import (
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/sync/errgroup"
"github.com/minio/sha256-simd"
)
@@ -131,6 +132,9 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
// Add user tags to the object info
objInfo.UserTags = fi.Metadata[xhttp.AmzObjectTagging]
// Add replication status to the object info
objInfo.ReplicationStatus = replication.StatusType(fi.Metadata[xhttp.AmzBucketReplicationStatus])
// etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of
// response headers. e.g, X-Minio-* or X-Amz-*.
@@ -146,7 +150,6 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
} else {
objInfo.StorageClass = globalMinioDefaultStorageClass
}
// Success.
return objInfo
}

View File

@@ -69,12 +69,10 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
if !srcInfo.metadataOnly {
return oi, NotImplemented{}
}
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
// Read metadata associated with the object from all disks.
storageDisks := er.getDisks()
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID)
// get Quorum for this object

View File

@@ -471,12 +471,10 @@ func (l *s3Objects) PutObject(ctx context.Context, bucket string, object string,
ServerSideEncryption: opts.ServerSideEncryption,
UserTags: tagMap,
}
ui, err := l.Client.PutObject(ctx, bucket, object, data, data.Size(), data.MD5Base64String(), data.SHA256HexString(), putOpts)
if err != nil {
return objInfo, minio.ErrorRespToObjectError(err, bucket, object)
}
// On success, populate the key & metadata so they are present in the notification
oi := miniogo.ObjectInfo{
ETag: ui.ETag,
@@ -712,7 +710,6 @@ func (l *s3Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error
// GetObjectTags gets the tags set on the object
func (l *s3Objects) GetObjectTags(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (*tags.Tags, error) {
var err error
if _, err = l.GetObjectInfo(ctx, bucket, object, opts); err != nil {
return nil, minio.ErrorRespToObjectError(err, bucket, object)
}

View File

@@ -406,7 +406,6 @@ var supportedDummyBucketAPIs = map[string][]string{
"website": {http.MethodGet, http.MethodDelete},
"logging": {http.MethodGet},
"accelerate": {http.MethodGet},
"replication": {http.MethodGet},
"requestPayment": {http.MethodGet},
}
@@ -418,7 +417,6 @@ var notImplementedBucketResourceNames = map[string]struct{}{
"logging": {},
"inventory": {},
"accelerate": {},
"replication": {},
"requestPayment": {},
}

View File

@@ -154,9 +154,9 @@ var (
globalPolicySys *PolicySys
globalIAMSys *IAMSys
globalLifecycleSys *LifecycleSys
globalBucketSSEConfigSys *BucketSSEConfigSys
globalLifecycleSys *LifecycleSys
globalBucketSSEConfigSys *BucketSSEConfigSys
globalBucketReplicationSys *BucketReplicationSys
// globalAPIConfig controls S3 API requests throttling,
// healthcheck readiness deadlines and cors settings.
globalAPIConfig apiConfig

View File

@@ -77,6 +77,7 @@ var supportedHeaders = []string{
xhttp.AmzStorageClass,
xhttp.AmzObjectTagging,
"expires",
xhttp.AmzBucketReplicationStatus,
// Add more supported headers here.
}

View File

@@ -81,7 +81,7 @@ const (
AmzObjectLockRetainUntilDate = "X-Amz-Object-Lock-Retain-Until-Date"
AmzObjectLockLegalHold = "X-Amz-Object-Lock-Legal-Hold"
AmzObjectLockBypassGovernance = "X-Amz-Bypass-Governance-Retention"
AmzBucketReplicationStatus = "X-Amz-Replication-Status"
// Multipart parts count
AmzMpPartsCount = "x-amz-mp-parts-count"

View File

@@ -22,6 +22,7 @@ import (
"time"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/madmin"
)
@@ -181,6 +182,7 @@ type ObjectInfo struct {
// Specify object storage class
StorageClass string
ReplicationStatus replication.StatusType
// User-Defined metadata
UserDefined map[string]string

View File

@@ -348,6 +348,27 @@ func (e BucketQuotaExceeded) Error() string {
return "Bucket quota exceeded for bucket: " + e.Bucket
}
// BucketReplicationConfigNotFound - no bucket replication config found
type BucketReplicationConfigNotFound GenericError
func (e BucketReplicationConfigNotFound) Error() string {
return "The replication configuration was not found: " + e.Bucket
}
// BucketReplicationDestinationNotFound bucket does not exist.
type BucketReplicationDestinationNotFound GenericError
func (e BucketReplicationDestinationNotFound) Error() string {
return "Destination bucket does not exist: " + e.Bucket
}
// BucketReplicationTargetNotFound replication target does not exist.
type BucketReplicationTargetNotFound GenericError
func (e BucketReplicationTargetNotFound) Error() string {
return "Replication target not found: " + e.Bucket
}
/// Bucket related errors.
// BucketNameInvalid - bucketname provided is invalid.

View File

@@ -44,6 +44,7 @@ import (
"github.com/minio/minio/cmd/logger"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/hash"
@@ -1153,7 +1154,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if objTags != "" {
srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
}
srcInfo.UserDefined = objectlock.FilterObjectLockMetadata(srcInfo.UserDefined, true, true)
retPerms := isPutActionAllowed(getRequestAuthType(r), dstBucket, dstObject, r, iampolicy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(getRequestAuthType(r), dstBucket, dstObject, r, iampolicy.PutObjectLegalHoldAction)
@@ -1176,6 +1176,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if globalBucketReplicationSys.mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()) {
srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
// Store the preserved compression metadata.
for k, v := range compressMetadata {
@@ -1254,7 +1257,17 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
if globalBucketReplicationSys.mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
defer replicateObject(ctx, dstBucket, dstObject, objInfo.VersionID, objectAPI, &eventArgs{
EventName: event.ObjectCreatedCopy,
BucketName: dstBucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}, false)
}
setPutObjHeaders(w, objInfo, false)
// We must not use the http.Header().Set method here because some (broken)
// clients expect the x-amz-copy-source-version-id header key to be literally
@@ -1497,7 +1510,15 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, metadata, "") {
metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending)
}
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
if s3Err = isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.ReplicateObjectAction); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
}
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() {
if crypto.IsRequested(r.Header) && !HasSuffix(object, SlashSeparator) { // handle SSE requests
@@ -1552,7 +1573,17 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
}
if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, metadata, "") {
defer replicateObject(ctx, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
EventName: event.ObjectCreatedPut,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}, false)
}
setPutObjHeaders(w, objInfo, false)
writeSuccessResponseHeadersOnly(w)
@@ -1664,7 +1695,9 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, metadata, "") {
metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending)
}
// We need to preserve the encryption headers set in EncryptRequest,
// so we do not want to override them, copy them instead.
for k, v := range encMetadata {
@@ -2611,7 +2644,17 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
setPutObjHeaders(w, objInfo, false)
if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
defer replicateObject(ctx, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
EventName: event.ObjectCreatedCompleteMultipartUpload,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}, false)
}
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)

View File

@@ -168,6 +168,9 @@ func newAllSubsystems() {
// Create new bucket versioning subsystem
globalBucketVersioningSys = NewBucketVersioningSys()
// Create new bucket replication subsytem
globalBucketReplicationSys = NewBucketReplicationSys()
}
func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
@@ -337,6 +340,10 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
return fmt.Errorf("Unable to initialize notification system: %w", err)
}
// Initialize bucket replication sub-system.
if err = globalBucketReplicationSys.Init(GlobalContext, buckets, newObject); err != nil {
return fmt.Errorf("Unable to initialize bucket replication sub-system: %w", err)
}
return nil
}

View File

@@ -414,6 +414,9 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
}
}
for _, version := range fivs.Versions {
item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())})
}
return totalSize, nil
})