2022-06-06 05:54:39 -04:00
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
2023-07-10 23:09:20 -04:00
"bytes"
2022-06-06 05:54:39 -04:00
"encoding/json"
"encoding/xml"
2023-07-10 23:09:20 -04:00
"errors"
2022-06-06 05:54:39 -04:00
"fmt"
"io"
"net/http"
2023-07-10 23:09:20 -04:00
"path"
2022-06-06 05:54:39 -04:00
"time"
2023-07-10 23:09:20 -04:00
"github.com/minio/minio-go/v7"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
2022-06-06 05:54:39 -04:00
"github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
2023-01-23 06:12:47 -05:00
"github.com/minio/mux"
2022-06-06 05:54:39 -04:00
"github.com/minio/pkg/bucket/policy"
)
// PutBucketReplicationConfigHandler - PUT Bucket replication configuration.
// ----------
// Add a replication configuration on the specified bucket as specified in https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketReplication.html
func ( api objectAPIHandlers ) PutBucketReplicationConfigHandler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "PutBucketReplicationConfig" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
if s3Error := checkRequestAuthType ( ctx , r , policy . PutReplicationConfigurationAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
2022-07-25 20:51:32 -04:00
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-10-24 23:24:32 -04:00
if globalSiteReplicationSys . isEnabled ( ) && logger . GetReqInfo ( ctx ) . Cred . AccessKey != globalActiveCred . AccessKey {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrReplicationDenyEditError ) , r . URL )
return
}
2022-06-06 05:54:39 -04:00
if versioned := globalBucketVersioningSys . Enabled ( bucket ) ; ! versioned {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrReplicationNeedsVersioningError ) , r . URL )
return
}
replicationConfig , err := replication . ParseConfig ( io . LimitReader ( r . Body , r . ContentLength ) )
if err != nil {
apiErr := errorCodes . ToAPIErr ( ErrMalformedXML )
apiErr . Description = err . Error ( )
writeErrorResponse ( ctx , w , apiErr , r . URL )
return
}
sameTarget , apiErr := validateReplicationDestination ( ctx , bucket , replicationConfig , true )
if apiErr != noError {
writeErrorResponse ( ctx , w , apiErr , r . URL )
return
}
// Validate the received bucket replication config
if err = replicationConfig . Validate ( bucket , sameTarget ) ; err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
configData , err := xml . Marshal ( replicationConfig )
if err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-06-28 21:09:20 -04:00
if _ , err = globalBucketMetadataSys . Update ( ctx , bucket , bucketReplicationConfig , configData ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
// Write success response.
writeSuccessResponseHeadersOnly ( w )
}
// GetBucketReplicationConfigHandler - GET Bucket replication configuration.
// ----------
// Gets the replication configuration for a bucket.
func ( api objectAPIHandlers ) GetBucketReplicationConfigHandler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "GetBucketReplicationConfig" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
// check if user has permissions to perform this operation
if s3Error := checkRequestAuthType ( ctx , r , policy . GetReplicationConfigurationAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
2022-07-25 20:51:32 -04:00
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
config , _ , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket )
if err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
configData , err := xml . Marshal ( config )
if err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
// Write success response.
writeSuccessResponseXML ( w , configData )
}
// DeleteBucketReplicationConfigHandler - DELETE Bucket replication config.
// ----------
func ( api objectAPIHandlers ) DeleteBucketReplicationConfigHandler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "DeleteBucketReplicationConfig" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
if s3Error := checkRequestAuthType ( ctx , r , policy . PutReplicationConfigurationAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
2022-07-25 20:51:32 -04:00
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
if globalSiteReplicationSys . isEnabled ( ) {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrReplicationDenyEditError ) , r . URL )
return
}
2022-10-19 20:55:09 -04:00
if _ , err := globalBucketMetadataSys . Delete ( ctx , bucket , bucketReplicationConfig ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-12-14 06:24:06 -05:00
targets , err := globalBucketTargetSys . ListBucketTargets ( ctx , bucket )
if err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
for _ , tgt := range targets . Targets {
if err := globalBucketTargetSys . RemoveTarget ( ctx , bucket , tgt . Arn ) ; err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
}
if _ , err := globalBucketMetadataSys . Delete ( ctx , bucket , bucketTargetsFile ) ; err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-06-06 05:54:39 -04:00
// Write success response.
writeSuccessResponseHeadersOnly ( w )
}
2023-08-30 04:00:59 -04:00
// GetBucketReplicationMetricsHandler - GET Bucket replication metrics. // Deprecated Aug 2023
2022-06-06 05:54:39 -04:00
// ----------
// Gets the replication metrics for a bucket.
func ( api objectAPIHandlers ) GetBucketReplicationMetricsHandler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "GetBucketReplicationMetrics" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
// check if user has permissions to perform this operation
if s3Error := checkRequestAuthType ( ctx , r , policy . GetReplicationConfigurationAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
2022-07-25 20:51:32 -04:00
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-07-06 12:53:33 -04:00
if _ , _ , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket ) ; err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2023-08-30 04:00:59 -04:00
w . Header ( ) . Set ( xhttp . ContentType , string ( mimeJSON ) )
enc := json . NewEncoder ( w )
stats := globalReplicationStats . getLatestReplicationStats ( bucket )
bwRpt := globalNotificationSys . GetBandwidthReports ( ctx , bucket )
bwMap := bwRpt . BucketStats [ bucket ]
for arn , st := range stats . ReplicationStats . Stats {
if bwMap != nil {
if bw , ok := bwMap [ arn ] ; ok {
st . BandWidthLimitInBytesPerSecond = bw . LimitInBytesPerSecond
st . CurrentBandwidthInBytesPerSecond = bw . CurrentBandwidthInBytesPerSecond
stats . ReplicationStats . Stats [ arn ] = st
}
}
}
if err := enc . Encode ( stats . ReplicationStats ) ; err != nil {
writeErrorResponseJSON ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
}
// GetBucketReplicationMetricsV2Handler - GET Bucket replication metrics.
// ----------
// Gets the replication metrics for a bucket.
func ( api objectAPIHandlers ) GetBucketReplicationMetricsV2Handler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "GetBucketReplicationMetricsV2" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
// check if user has permissions to perform this operation
if s3Error := checkRequestAuthType ( ctx , r , policy . GetReplicationConfigurationAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
if _ , _ , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket ) ; err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
2022-06-06 05:54:39 -04:00
}
w . Header ( ) . Set ( xhttp . ContentType , string ( mimeJSON ) )
enc := json . NewEncoder ( w )
2023-08-30 04:00:59 -04:00
stats := globalReplicationStats . getLatestReplicationStats ( bucket )
2023-01-19 08:22:16 -05:00
bwRpt := globalNotificationSys . GetBandwidthReports ( ctx , bucket )
bwMap := bwRpt . BucketStats [ bucket ]
2023-08-30 04:00:59 -04:00
for arn , st := range stats . ReplicationStats . Stats {
2023-01-19 08:22:16 -05:00
if bwMap != nil {
if bw , ok := bwMap [ arn ] ; ok {
st . BandWidthLimitInBytesPerSecond = bw . LimitInBytesPerSecond
st . CurrentBandwidthInBytesPerSecond = bw . CurrentBandwidthInBytesPerSecond
2023-08-30 04:00:59 -04:00
stats . ReplicationStats . Stats [ arn ] = st
2023-01-19 08:22:16 -05:00
}
}
}
2023-08-30 04:00:59 -04:00
stats . Uptime = UTCNow ( ) . Unix ( ) - globalBootTime . Unix ( )
if err := enc . Encode ( stats ) ; err != nil {
writeErrorResponseJSON ( ctx , w , toAPIError ( ctx , err ) , r . URL )
2022-06-06 05:54:39 -04:00
return
}
}
// ResetBucketReplicationStartHandler - starts a replication reset for all objects in a bucket which
// qualify for replication and re-sync the object(s) to target, provided ExistingObjectReplication is
// enabled for the qualifying rule. This API is a MinIO only extension provided for situations where
// remote target is entirely lost,and previously replicated objects need to be re-synced. If resync is
// already in progress it returns an error
func ( api objectAPIHandlers ) ResetBucketReplicationStartHandler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "ResetBucketReplicationStart" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
durationStr := r . URL . Query ( ) . Get ( "older-than" )
arn := r . URL . Query ( ) . Get ( "arn" )
resetID := r . URL . Query ( ) . Get ( "reset-id" )
if resetID == "" {
resetID = mustGetUUID ( )
}
var (
days time . Duration
err error
)
if durationStr != "" {
days , err = time . ParseDuration ( durationStr )
if err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , InvalidArgument {
Bucket : bucket ,
Err : fmt . Errorf ( "invalid query parameter older-than %s for %s : %w" , durationStr , bucket , err ) ,
} ) , r . URL )
2023-03-27 12:15:23 -04:00
return
2022-06-06 05:54:39 -04:00
}
}
resetBeforeDate := UTCNow ( ) . AddDate ( 0 , 0 , - 1 * int ( days / 24 ) )
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
if s3Error := checkRequestAuthType ( ctx , r , policy . ResetBucketReplicationStateAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
2022-07-25 20:51:32 -04:00
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
config , _ , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket )
if err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-10-29 02:21:33 -04:00
hasARN , hasExistingObjEnabled := config . HasExistingObjectReplication ( arn )
if ! hasARN {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrRemoteTargetNotFoundError ) , r . URL )
return
}
if ! hasExistingObjEnabled {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrReplicationNoExistingObjects ) , r . URL )
return
}
tgtArns := config . FilterTargetArns (
replication . ObjectOpts {
OpType : replication . ResyncReplicationType ,
TargetArn : arn ,
} )
if len ( tgtArns ) == 0 {
writeErrorResponseJSON ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrBadRequest , InvalidArgument {
Bucket : bucket ,
Err : fmt . Errorf ( "Remote target ARN %s missing or ineligible for replication resync" , arn ) ,
} ) , r . URL )
return
}
if len ( tgtArns ) > 1 && arn == "" {
writeErrorResponseJSON ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrBadRequest , InvalidArgument {
Bucket : bucket ,
Err : fmt . Errorf ( "ARN should be specified for replication reset" ) ,
} ) , r . URL )
return
}
var rinfo ResyncTargetsInfo
target := globalBucketTargetSys . GetRemoteBucketTargetByArn ( ctx , bucket , tgtArns [ 0 ] )
target . ResetBeforeDate = UTCNow ( ) . AddDate ( 0 , 0 , - 1 * int ( days / 24 ) )
target . ResetID = resetID
rinfo . Targets = append ( rinfo . Targets , ResyncTarget { Arn : tgtArns [ 0 ] , ResetID : target . ResetID } )
if err = globalBucketTargetSys . SetTarget ( ctx , bucket , & target , true ) ; err != nil {
switch err . ( type ) {
2022-08-16 20:46:22 -04:00
case RemoteTargetConnectionErr :
2022-06-06 05:54:39 -04:00
writeErrorResponseJSON ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationRemoteConnectionError , err ) , r . URL )
default :
writeErrorResponseJSON ( ctx , w , toAPIError ( ctx , err ) , r . URL )
}
2023-03-27 12:15:23 -04:00
return
2022-06-06 05:54:39 -04:00
}
2022-11-18 04:39:05 -05:00
targets , err := globalBucketTargetSys . ListBucketTargets ( ctx , bucket )
if err != nil {
writeErrorResponseJSON ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
tgtBytes , err := json . Marshal ( & targets )
if err != nil {
writeErrorResponseJSON ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrAdminConfigBadJSON , err ) , r . URL )
return
}
if _ , err = globalBucketMetadataSys . Update ( ctx , bucket , bucketTargetsFile , tgtBytes ) ; err != nil {
writeErrorResponseJSON ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-11-14 10:16:40 -05:00
if err := globalReplicationPool . resyncer . start ( ctx , objectAPI , resyncOpts {
bucket : bucket ,
arn : arn ,
resyncID : resetID ,
resyncBefore : resetBeforeDate ,
} ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponseJSON ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrBadRequest , InvalidArgument {
Bucket : bucket ,
Err : err ,
} ) , r . URL )
return
}
data , err := json . Marshal ( rinfo )
if err != nil {
writeErrorResponseJSON ( ctx , w , toAdminAPIErr ( ctx , err ) , r . URL )
return
}
// Write success response.
writeSuccessResponseJSON ( w , data )
}
// ResetBucketReplicationStatusHandler - returns the status of replication reset.
// This API is a MinIO only extension
func ( api objectAPIHandlers ) ResetBucketReplicationStatusHandler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "ResetBucketReplicationStatus" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
arn := r . URL . Query ( ) . Get ( "arn" )
var err error
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
if s3Error := checkRequestAuthType ( ctx , r , policy . ResetBucketReplicationStateAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
2022-07-25 20:51:32 -04:00
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
2022-06-06 05:54:39 -04:00
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
if _ , _ , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket ) ; err != nil {
writeErrorResponse ( ctx , w , toAPIError ( ctx , err ) , r . URL )
return
}
2022-11-14 10:16:40 -05:00
var tgtStats map [ string ] TargetReplicationResyncStatus
globalReplicationPool . resyncer . RLock ( )
brs , ok := globalReplicationPool . resyncer . statusMap [ bucket ]
if ok {
tgtStats = brs . cloneTgtStats ( )
}
globalReplicationPool . resyncer . RUnlock ( )
2022-06-06 05:54:39 -04:00
if ! ok {
brs , err = loadBucketResyncMetadata ( ctx , bucket , objectAPI )
if err != nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrBadRequest , InvalidArgument {
Bucket : bucket ,
Err : fmt . Errorf ( "No replication resync status available for %s" , arn ) ,
} ) , r . URL )
return
}
2022-11-14 10:16:40 -05:00
tgtStats = brs . cloneTgtStats ( )
2022-06-06 05:54:39 -04:00
}
var rinfo ResyncTargetsInfo
2022-11-14 10:16:40 -05:00
for tarn , st := range tgtStats {
2022-06-06 05:54:39 -04:00
if arn != "" && tarn != arn {
continue
}
rinfo . Targets = append ( rinfo . Targets , ResyncTarget {
Arn : tarn ,
ResetID : st . ResyncID ,
StartTime : st . StartTime ,
2022-11-14 10:16:40 -05:00
EndTime : st . LastUpdate ,
2022-06-06 05:54:39 -04:00
ResyncStatus : st . ResyncStatus . String ( ) ,
ReplicatedSize : st . ReplicatedSize ,
ReplicatedCount : st . ReplicatedCount ,
FailedSize : st . FailedSize ,
FailedCount : st . FailedCount ,
Bucket : st . Bucket ,
Object : st . Object ,
} )
}
data , err := json . Marshal ( rinfo )
if err != nil {
writeErrorResponseJSON ( ctx , w , toAdminAPIErr ( ctx , err ) , r . URL )
return
}
// Write success response.
writeSuccessResponseJSON ( w , data )
}
2023-07-10 23:09:20 -04:00
// ValidateBucketReplicationCredsHandler - validate replication credentials for a bucket.
// ----------
func ( api objectAPIHandlers ) ValidateBucketReplicationCredsHandler ( w http . ResponseWriter , r * http . Request ) {
ctx := newContext ( r , w , "ValidateBucketReplicationCreds" )
defer logger . AuditLog ( ctx , w , r , mustGetClaimsFromToken ( r ) )
vars := mux . Vars ( r )
bucket := vars [ "bucket" ]
objectAPI := api . ObjectAPI ( )
if objectAPI == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrServerNotInitialized ) , r . URL )
return
}
if s3Error := checkRequestAuthType ( ctx , r , policy . GetReplicationConfigurationAction , bucket , "" ) ; s3Error != ErrNone {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( s3Error ) , r . URL )
return
}
// Check if bucket exists.
if _ , err := objectAPI . GetBucketInfo ( ctx , bucket , BucketOptions { } ) ; err != nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , err ) , r . URL )
return
}
if versioned := globalBucketVersioningSys . Enabled ( bucket ) ; ! versioned {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErr ( ErrReplicationNeedsVersioningError ) , r . URL )
return
}
replicationConfig , _ , err := globalBucketMetadataSys . GetReplicationConfig ( ctx , bucket )
if err != nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationConfigurationNotFoundError , err ) , r . URL )
return
}
lockEnabled := false
lcfg , _ , err := globalBucketMetadataSys . GetObjectLockConfig ( bucket )
if err != nil {
if ! errors . Is ( err , BucketObjectLockConfigNotFound { Bucket : bucket } ) {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , err ) , r . URL )
return
}
}
if lcfg != nil {
lockEnabled = lcfg . Enabled ( )
}
sameTarget , apiErr := validateReplicationDestination ( ctx , bucket , replicationConfig , true )
if apiErr != noError {
writeErrorResponse ( ctx , w , apiErr , r . URL )
return
}
// Validate the bucket replication config
if err = replicationConfig . Validate ( bucket , sameTarget ) ; err != nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , err ) , r . URL )
return
}
buf := bytes . Repeat ( [ ] byte ( "a" ) , 8 )
for _ , rule := range replicationConfig . Rules {
if rule . Status == replication . Disabled {
continue
}
2023-08-30 04:00:59 -04:00
clnt := globalBucketTargetSys . GetRemoteTargetClient ( rule . Destination . Bucket )
2023-07-10 23:09:20 -04:00
if clnt == nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrRemoteTargetNotFoundError , fmt . Errorf ( "replication config with rule ID %s has a stale target" , rule . ID ) ) , r . URL )
return
}
if lockEnabled {
lock , _ , _ , _ , err := clnt . GetObjectLockConfig ( ctx , clnt . Bucket )
if err != nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , err ) , r . URL )
return
}
if lock != objectlock . Enabled {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationDestinationMissingLock , fmt . Errorf ( "target bucket %s is not object lock enabled" , clnt . Bucket ) ) , r . URL )
return
}
}
vcfg , err := clnt . GetBucketVersioning ( ctx , clnt . Bucket )
if err != nil {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , err ) , r . URL )
return
}
if ! vcfg . Enabled ( ) {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrRemoteTargetNotVersionedError , fmt . Errorf ( "target bucket %s is not versioned" , clnt . Bucket ) ) , r . URL )
return
}
if sameTarget && bucket == clnt . Bucket {
writeErrorResponseJSON ( ctx , w , errorCodes . ToAPIErr ( ErrBucketRemoteIdenticalToSource ) , r . URL )
return
}
reader := bytes . NewReader ( buf )
// fake a PutObject and RemoveObject call to validate permissions
c := & minio . Core { Client : clnt . Client }
putOpts := minio . PutObjectOptions {
Internal : minio . AdvancedPutOptions {
SourceVersionID : mustGetUUID ( ) ,
ReplicationStatus : minio . ReplicationStatusReplica ,
SourceMTime : time . Now ( ) ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
ReplicationValidityCheck : true , // set this to validate the replication config
} ,
}
obj := path . Join ( minioReservedBucket , globalLocalNodeNameHex , "deleteme" )
ui , err := c . PutObject ( ctx , clnt . Bucket , obj , reader , int64 ( len ( buf ) ) , "" , "" , putOpts )
if err != nil && ! isReplicationPermissionCheck ( ErrorRespToObjectError ( err , bucket , obj ) ) {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , fmt . Errorf ( "s3:ReplicateObject permissions missing for replication user: %w" , err ) ) , r . URL )
return
}
err = c . RemoveObject ( ctx , clnt . Bucket , obj , minio . RemoveObjectOptions {
VersionID : ui . VersionID ,
Internal : minio . AdvancedRemoveOptions {
ReplicationDeleteMarker : true ,
ReplicationMTime : time . Now ( ) ,
ReplicationStatus : minio . ReplicationStatusReplica ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
ReplicationValidityCheck : true , // set this to validate the replication config
} ,
} )
if err != nil && ! isReplicationPermissionCheck ( ErrorRespToObjectError ( err , bucket , obj ) ) {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , fmt . Errorf ( "s3:ReplicateDelete permissions missing for replication user: %w" , err ) ) , r . URL )
return
}
// fake a versioned delete - to ensure deny policies are not in place
err = c . RemoveObject ( ctx , clnt . Bucket , obj , minio . RemoveObjectOptions {
VersionID : ui . VersionID ,
Internal : minio . AdvancedRemoveOptions {
ReplicationDeleteMarker : false ,
ReplicationMTime : time . Now ( ) ,
ReplicationStatus : minio . ReplicationStatusReplica ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
ReplicationValidityCheck : true , // set this to validate the replication config
} ,
} )
if err != nil && ! isReplicationPermissionCheck ( ErrorRespToObjectError ( err , bucket , obj ) ) {
writeErrorResponse ( ctx , w , errorCodes . ToAPIErrWithErr ( ErrReplicationValidationError , fmt . Errorf ( "s3:ReplicateDelete/s3:DeleteObject permissions missing for replication user: %w" , err ) ) , r . URL )
return
}
}
// Write success response.
writeSuccessResponseHeadersOnly ( w )
}