2021-04-18 15:41:13 -04:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2020-07-21 20:49:56 -04:00
package cmd
import (
"context"
2020-09-15 23:44:48 -04:00
"fmt"
2021-06-30 10:44:24 -04:00
"io"
2021-11-19 17:46:14 -05:00
"math"
2020-07-21 20:49:56 -04:00
"net/http"
2021-02-03 23:41:33 -05:00
"reflect"
2020-08-12 20:32:24 -04:00
"strings"
2021-03-09 05:56:42 -05:00
"sync"
2020-07-21 20:49:56 -04:00
"time"
2021-05-06 11:52:02 -04:00
"github.com/minio/madmin-go"
2021-08-23 11:16:18 -04:00
"github.com/minio/minio-go/v7"
2020-07-21 20:49:56 -04:00
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
2021-06-01 17:59:40 -04:00
"github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event"
2021-06-30 10:44:24 -04:00
"github.com/minio/minio/internal/hash"
2021-06-01 17:59:40 -04:00
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
2020-07-21 20:49:56 -04:00
)
2021-09-18 16:31:35 -04:00
const (
throttleDeadline = 1 * time . Hour
// ReplicationReset has reset id and timestamp of last reset operation
ReplicationReset = "replication-reset"
// ReplicationStatus has internal replication status - stringified representation of target's replication status for all replication
// activity initiated from this cluster
ReplicationStatus = "replication-status"
// ReplicationTimestamp - the last time replication was initiated on this cluster for this object version
ReplicationTimestamp = "replication-timestamp"
// ReplicaStatus - this header is present if a replica was received by this cluster for this object version
ReplicaStatus = "replica-status"
// ReplicaTimestamp - the last time a replica was received by this cluster for this object version
ReplicaTimestamp = "replica-timestamp"
// TaggingTimestamp - the last time a tag metadata modification happened on this cluster for this object version
TaggingTimestamp = "tagging-timestamp"
// ObjectLockRetentionTimestamp - the last time a object lock metadata modification happened on this cluster for this object version
ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
// ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
2021-12-13 21:22:56 -05:00
// ReplicationWorkerMultiplier is suggested worker multiplier if traffic exceeds replication worker capacity
ReplicationWorkerMultiplier = 1.5
2021-09-18 16:31:35 -04:00
)
2021-07-28 18:20:01 -04:00
2020-07-30 22:55:22 -04:00
// gets replication config associated to a given bucket name.
func getReplicationConfig ( ctx context . Context , bucketName string ) ( rc * replication . Config , err error ) {
2020-07-21 20:49:56 -04:00
if globalIsGateway {
2020-10-09 12:59:52 -04:00
objAPI := newObjectLayerFn ( )
2020-07-21 20:49:56 -04:00
if objAPI == nil {
2021-09-18 16:31:35 -04:00
return rc , errServerNotInitialized
2020-07-21 20:49:56 -04:00
}
2021-09-18 16:31:35 -04:00
return rc , BucketReplicationConfigNotFound { Bucket : bucketName }
2020-07-21 20:49:56 -04:00
}
return globalBucketMetadataSys . GetReplicationConfig ( ctx , bucketName )
}
2020-07-30 22:55:22 -04:00
// validateReplicationDestination returns error if replication destination bucket missing or not configured
2020-07-21 20:49:56 -04:00
// It also returns true if replication destination is same as this server.
2021-09-21 16:03:20 -04:00
func validateReplicationDestination ( ctx context . Context , bucket string , rCfg * replication . Config ) ( bool , APIError ) {
2021-09-18 16:31:35 -04:00
var arns [ ] string
if rCfg . RoleArn != "" {
arns = append ( arns , rCfg . RoleArn )
} else {
for _ , rule := range rCfg . Rules {
arns = append ( arns , rule . Destination . String ( ) )
}
2020-07-21 20:49:56 -04:00
}
2021-09-18 16:31:35 -04:00
for _ , arnStr := range arns {
arn , err := madmin . ParseARN ( arnStr )
if err != nil {
2021-09-21 16:03:20 -04:00
return false , errorCodes . ToAPIErrWithErr ( ErrBucketRemoteArnInvalid , err )
2021-09-18 16:31:35 -04:00
}
if arn . Type != madmin . ReplicationService {
2021-09-21 16:03:20 -04:00
return false , toAPIError ( ctx , BucketRemoteArnTypeInvalid { Bucket : bucket } )
2021-09-18 16:31:35 -04:00
}
clnt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , arnStr )
if clnt == nil {
2021-09-21 16:03:20 -04:00
return false , toAPIError ( ctx , BucketRemoteTargetNotFound { Bucket : bucket } )
2021-09-18 16:31:35 -04:00
}
2021-09-21 16:03:20 -04:00
if found , err := clnt . BucketExists ( ctx , arn . Bucket ) ; ! found {
return false , errorCodes . ToAPIErrWithErr ( ErrRemoteDestinationNotFoundError , err )
2021-09-18 16:31:35 -04:00
}
if ret , err := globalBucketObjectLockSys . Get ( bucket ) ; err == nil {
if ret . LockEnabled {
lock , _ , _ , _ , err := clnt . GetObjectLockConfig ( ctx , arn . Bucket )
if err != nil || lock != "Enabled" {
2021-09-21 16:03:20 -04:00
return false , errorCodes . ToAPIErrWithErr ( ErrReplicationDestinationMissingLock , err )
2021-09-18 16:31:35 -04:00
}
2020-08-05 02:02:27 -04:00
}
}
2021-09-18 16:31:35 -04:00
// validate replication ARN against target endpoint
c , ok := globalBucketTargetSys . arnRemotesMap [ arnStr ]
if ok {
if c . EndpointURL ( ) . String ( ) == clnt . EndpointURL ( ) . String ( ) {
sameTarget , _ := isLocalHost ( clnt . EndpointURL ( ) . Hostname ( ) , clnt . EndpointURL ( ) . Port ( ) , globalMinioPort )
2021-09-21 16:03:20 -04:00
return sameTarget , toAPIError ( ctx , nil )
2021-09-18 16:31:35 -04:00
}
2020-07-21 20:49:56 -04:00
}
}
2021-09-21 16:03:20 -04:00
return false , toAPIError ( ctx , BucketRemoteTargetNotFound { Bucket : bucket } )
2020-07-21 20:49:56 -04:00
}
2021-06-01 22:59:11 -04:00
type mustReplicateOptions struct {
2021-09-18 16:31:35 -04:00
meta map [ string ] string
status replication . StatusType
opType replication . Type
replicationRequest bool // incoming request is a replication request
2021-06-01 22:59:11 -04:00
}
func ( o mustReplicateOptions ) ReplicationStatus ( ) ( s replication . StatusType ) {
if rs , ok := o . meta [ xhttp . AmzBucketReplicationStatus ] ; ok {
return replication . StatusType ( rs )
}
return s
}
func ( o mustReplicateOptions ) isExistingObjectReplication ( ) bool {
return o . opType == replication . ExistingObjectReplicationType
}
func ( o mustReplicateOptions ) isMetadataReplication ( ) bool {
return o . opType == replication . MetadataReplicationType
}
2021-09-18 16:31:35 -04:00
func getMustReplicateOptions ( o ObjectInfo , op replication . Type , opts ObjectOptions ) mustReplicateOptions {
2021-06-01 22:59:11 -04:00
if ! op . Valid ( ) {
op = replication . ObjectReplicationType
if o . metadataOnly {
op = replication . MetadataReplicationType
}
}
meta := cloneMSS ( o . UserDefined )
if o . UserTags != "" {
meta [ xhttp . AmzObjectTagging ] = o . UserTags
}
2021-09-18 16:31:35 -04:00
2021-06-01 22:59:11 -04:00
return mustReplicateOptions {
2021-09-18 16:31:35 -04:00
meta : meta ,
status : o . ReplicationStatus ,
opType : op ,
replicationRequest : opts . ReplicationRequest ,
2021-06-01 22:59:11 -04:00
}
}
2021-04-29 22:01:43 -04:00
2021-01-12 01:36:51 -05:00
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
2021-09-18 16:31:35 -04:00
func mustReplicate ( ctx context . Context , bucket , object string , mopts mustReplicateOptions ) ( dsc ReplicateDecision ) {
2020-07-21 20:49:56 -04:00
if globalIsGateway {
2021-09-18 16:31:35 -04:00
return
2020-07-21 20:49:56 -04:00
}
2021-09-18 16:31:35 -04:00
2021-06-01 22:59:11 -04:00
replStatus := mopts . ReplicationStatus ( )
if replStatus == replication . Replica && ! mopts . isMetadataReplication ( ) {
2021-09-18 16:31:35 -04:00
return
}
if mopts . replicationRequest { // incoming replication request on target cluster
return
2020-07-21 20:49:56 -04:00
}
2020-07-30 22:55:22 -04:00
cfg , err := getReplicationConfig ( ctx , bucket )
2020-07-21 20:49:56 -04:00
if err != nil {
2021-09-18 16:31:35 -04:00
return
2020-07-21 20:49:56 -04:00
}
opts := replication . ObjectOpts {
2021-06-01 22:59:11 -04:00
Name : object ,
SSEC : crypto . SSEC . IsEncrypted ( mopts . meta ) ,
Replica : replStatus == replication . Replica ,
ExistingObject : mopts . isExistingObjectReplication ( ) ,
2020-07-21 20:49:56 -04:00
}
2021-06-01 22:59:11 -04:00
tagStr , ok := mopts . meta [ xhttp . AmzObjectTagging ]
2020-07-21 20:49:56 -04:00
if ok {
opts . UserTags = tagStr
}
2021-09-18 16:31:35 -04:00
tgtArns := cfg . FilterTargetArns ( opts )
for _ , tgtArn := range tgtArns {
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
// the target online status should not be used here while deciding
// whether to replicate as the target could be temporarily down
opts . TargetArn = tgtArn
replicate := cfg . Replicate ( opts )
var synchronous bool
if tgt != nil {
synchronous = tgt . replicateSync
}
dsc . Set ( newReplicateTargetDecision ( tgtArn , replicate , synchronous ) )
2021-01-12 01:36:51 -05:00
}
2021-09-18 16:31:35 -04:00
return dsc
2021-01-12 01:36:51 -05:00
}
// Standard headers that needs to be extracted from User metadata.
var standardHeaders = [ ] string {
2021-01-27 14:22:34 -05:00
xhttp . ContentType ,
xhttp . CacheControl ,
xhttp . ContentEncoding ,
xhttp . ContentLanguage ,
xhttp . ContentDisposition ,
2021-01-12 01:36:51 -05:00
xhttp . AmzStorageClass ,
xhttp . AmzObjectTagging ,
xhttp . AmzBucketReplicationStatus ,
2021-01-27 14:22:34 -05:00
xhttp . AmzObjectLockMode ,
xhttp . AmzObjectLockRetainUntilDate ,
xhttp . AmzObjectLockLegalHold ,
xhttp . AmzTagCount ,
xhttp . AmzServerSideEncryption ,
2020-07-21 20:49:56 -04:00
}
2020-11-19 21:43:58 -05:00
// returns true if any of the objects being deleted qualifies for replication.
func hasReplicationRules ( ctx context . Context , bucket string , objects [ ] ObjectToDelete ) bool {
c , err := getReplicationConfig ( ctx , bucket )
if err != nil || c == nil {
return false
}
for _ , obj := range objects {
if c . HasActiveRules ( obj . ObjectName , true ) {
return true
}
}
return false
}
2021-01-12 01:36:51 -05:00
// isStandardHeader returns true if header is a supported header and not a custom header
2021-02-03 23:41:33 -05:00
func isStandardHeader ( matchHeaderKey string ) bool {
return equals ( matchHeaderKey , standardHeaders ... )
2021-01-12 01:36:51 -05:00
}
2020-11-19 21:43:58 -05:00
// returns whether object version is a deletemarker and if object qualifies for replication
2021-09-18 16:31:35 -04:00
func checkReplicateDelete ( ctx context . Context , bucket string , dobj ObjectToDelete , oi ObjectInfo , delOpts ObjectOptions , gerr error ) ( dsc ReplicateDecision ) {
2020-11-19 21:43:58 -05:00
rcfg , err := getReplicationConfig ( ctx , bucket )
if err != nil || rcfg == nil {
2021-09-18 16:31:35 -04:00
return
}
// If incoming request is a replication request, it does not need to be re-replicated.
if delOpts . ReplicationRequest {
return
2020-11-19 21:43:58 -05:00
}
2021-02-18 19:35:37 -05:00
opts := replication . ObjectOpts {
Name : dobj . ObjectName ,
SSEC : crypto . SSEC . IsEncrypted ( oi . UserDefined ) ,
UserTags : oi . UserTags ,
DeleteMarker : oi . DeleteMarker ,
VersionID : dobj . VersionID ,
2021-03-13 13:28:35 -05:00
OpType : replication . DeleteReplicationType ,
2021-02-18 19:35:37 -05:00
}
2021-09-18 16:31:35 -04:00
tgtArns := rcfg . FilterTargetArns ( opts )
if len ( tgtArns ) > 0 {
dsc . targetsMap = make ( map [ string ] replicateTargetDecision , len ( tgtArns ) )
var sync , replicate bool
for _ , tgtArn := range tgtArns {
opts . TargetArn = tgtArn
replicate = rcfg . Replicate ( opts )
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound
if gerr != nil {
validReplStatus := false
switch oi . TargetReplicationStatus ( tgtArn ) {
case replication . Pending , replication . Completed , replication . Failed :
validReplStatus = true
}
if oi . DeleteMarker && ( validReplStatus || replicate ) {
dsc . Set ( newReplicateTargetDecision ( tgtArn , replicate , sync ) )
continue
} else {
// can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target
replicate = oi . VersionPurgeStatus == Pending || oi . VersionPurgeStatus == Failed
dsc . Set ( newReplicateTargetDecision ( tgtArn , replicate , sync ) )
continue
}
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
tgtDsc := newReplicateTargetDecision ( tgtArn , false , false )
if tgt != nil {
tgtDsc = newReplicateTargetDecision ( tgtArn , replicate , tgt . replicateSync )
}
dsc . Set ( tgtDsc )
}
}
return dsc
2020-11-19 21:43:58 -05:00
}
// replicate deletes to the designated replication target if replication configuration
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
// is specified) enabled.
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
// on target.
2021-07-01 17:02:44 -04:00
func replicateDelete ( ctx context . Context , dobj DeletedObjectReplicationInfo , objectAPI ObjectLayer , trigger string ) {
2021-09-18 16:31:35 -04:00
var replicationStatus replication . StatusType
2020-11-19 21:43:58 -05:00
bucket := dobj . Bucket
2021-02-03 23:41:33 -05:00
versionID := dobj . DeleteMarkerVersionID
if versionID == "" {
versionID = dobj . VersionID
}
2021-07-01 17:02:44 -04:00
defer func ( ) {
2021-09-18 16:31:35 -04:00
replStatus := string ( replicationStatus )
2021-07-01 17:02:44 -04:00
auditLogInternal ( context . Background ( ) , bucket , dobj . ObjectName , AuditLogOptions {
Trigger : trigger ,
APIName : ReplicateDeleteAPI ,
VersionID : versionID ,
Status : replStatus ,
} )
} ( )
2020-11-19 21:43:58 -05:00
rcfg , err := getReplicationConfig ( ctx , bucket )
if err != nil || rcfg == nil {
2021-01-27 14:22:34 -05:00
logger . LogIf ( ctx , err )
2021-02-03 23:41:33 -05:00
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
2020-11-19 21:43:58 -05:00
return
}
2021-09-18 16:31:35 -04:00
dsc , err := parseReplicateDecision ( dobj . ReplicationState . ReplicateDecisionStr )
if err != nil {
logger . LogIf ( ctx , err )
2021-02-03 23:41:33 -05:00
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
2020-11-19 21:43:58 -05:00
return
}
2021-02-03 23:41:33 -05:00
2021-08-23 11:16:18 -04:00
// Lock the object name before starting replication operation.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI . NewNSLock ( bucket , "/[replicate]/" + dobj . ObjectName )
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get lock for object: %s bucket:%s arn:%s" , dobj . ObjectName , bucket , rcfg . RoleArn ) )
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
return
}
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2020-11-19 21:43:58 -05:00
2021-09-18 16:31:35 -04:00
var wg sync . WaitGroup
var rinfos replicatedInfos
rinfos . Targets = make ( [ ] replicatedTargetInfo , len ( dsc . targetsMap ) )
idx := - 1
for tgtArn := range dsc . targetsMap {
idx ++
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
if tgt == nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for bucket:%s arn:%s" , bucket , tgtArn ) )
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
continue
2020-11-19 21:43:58 -05:00
}
2021-09-18 16:31:35 -04:00
if tgt := dsc . targetsMap [ tgtArn ] ; ! tgt . Replicate {
continue
2020-11-19 21:43:58 -05:00
}
2021-09-18 16:31:35 -04:00
// if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
if dobj . TargetArn != "" && dobj . TargetArn != tgt . ARN {
continue
}
wg . Add ( 1 )
go func ( index int , tgt * TargetClient ) {
defer wg . Done ( )
rinfo := replicateDeleteToTarget ( ctx , dobj , objectAPI , tgt )
rinfos . Targets [ index ] = rinfo
} ( idx , tgt )
2020-11-19 21:43:58 -05:00
}
2021-09-18 16:31:35 -04:00
wg . Wait ( )
replicationStatus = rinfos . ReplicationStatus ( )
prevStatus := dobj . DeleteMarkerReplicationStatus ( )
2021-04-03 12:03:42 -04:00
if dobj . VersionID != "" {
2021-09-18 16:31:35 -04:00
prevStatus = replication . StatusType ( dobj . VersionPurgeStatus ( ) )
replicationStatus = replication . StatusType ( rinfos . VersionPurgeStatus ( ) )
2021-04-03 12:03:42 -04:00
}
2021-09-18 16:31:35 -04:00
2021-04-04 18:34:33 -04:00
// to decrement pending count later.
2021-09-18 16:31:35 -04:00
for _ , rinfo := range rinfos . Targets {
if rinfo . ReplicationStatus != rinfo . PrevReplicationStatus {
2021-11-17 15:10:57 -05:00
globalReplicationStats . Update ( dobj . Bucket , rinfo . Arn , 0 , 0 , replicationStatus ,
2021-09-18 16:31:35 -04:00
prevStatus , replication . DeleteReplicationType )
}
}
2021-01-25 17:04:41 -05:00
2020-11-22 02:48:50 -05:00
var eventName = event . ObjectReplicationComplete
2021-09-18 16:31:35 -04:00
if replicationStatus == replication . Failed {
2020-11-22 02:48:50 -05:00
eventName = event . ObjectReplicationFailed
}
2021-09-18 16:31:35 -04:00
drs := getReplicationState ( rinfos , dobj . ReplicationState , dobj . VersionID )
2021-02-03 23:41:33 -05:00
dobjInfo , err := objectAPI . DeleteObject ( ctx , bucket , dobj . ObjectName , ObjectOptions {
2021-09-18 16:31:35 -04:00
VersionID : versionID ,
MTime : dobj . DeleteMarkerMTime . Time ,
DeleteReplication : drs ,
Versioned : globalBucketVersioningSys . Enabled ( bucket ) ,
VersionSuspended : globalBucketVersioningSys . Suspended ( bucket ) ,
2021-01-25 17:04:41 -05:00
} )
2021-02-09 18:11:43 -05:00
if err != nil && ! isErrVersionNotFound ( err ) { // VersionNotFound would be reported by pool that object version is missing on.
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s(%s): %s" , bucket , dobj . ObjectName , versionID , err ) )
2021-02-03 23:41:33 -05:00
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : eventName ,
} )
} else {
sendEvent ( eventArgs {
BucketName : bucket ,
Object : dobjInfo ,
Host : "Internal: [Replication]" ,
EventName : eventName ,
} )
2020-11-19 21:43:58 -05:00
}
}
2021-09-18 16:31:35 -04:00
func replicateDeleteToTarget ( ctx context . Context , dobj DeletedObjectReplicationInfo , objectAPI ObjectLayer , tgt * TargetClient ) ( rinfo replicatedTargetInfo ) {
versionID := dobj . DeleteMarkerVersionID
if versionID == "" {
versionID = dobj . VersionID
}
rinfo = dobj . ReplicationState . targetState ( tgt . ARN )
rinfo . OpType = dobj . OpType
defer func ( ) {
if rinfo . ReplicationStatus == replication . Completed && tgt . ResetID != "" && dobj . OpType == replication . ExistingObjectReplicationType {
rinfo . ResyncTimestamp = fmt . Sprintf ( "%s;%s" , UTCNow ( ) . Format ( http . TimeFormat ) , tgt . ResetID )
}
} ( )
if dobj . VersionID == "" && rinfo . PrevReplicationStatus == replication . Completed && dobj . OpType != replication . ExistingObjectReplicationType {
rinfo . ReplicationStatus = rinfo . PrevReplicationStatus
return
}
if dobj . VersionID != "" && rinfo . VersionPurgeStatus == Complete {
return
}
if tgt . IsOffline ( ) {
logger . LogIf ( ctx , fmt . Errorf ( "remote target is offline for bucket:%s arn:%s" , dobj . Bucket , tgt . ARN ) )
sendEvent ( eventArgs {
BucketName : dobj . Bucket ,
Object : ObjectInfo {
Bucket : dobj . Bucket ,
Name : dobj . ObjectName ,
VersionID : dobj . VersionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Failed
} else {
rinfo . VersionPurgeStatus = Failed
}
return
}
2021-12-16 18:34:55 -05:00
// early return if already replicated delete marker for existing object replication/ healing delete markers
if dobj . DeleteMarkerVersionID != "" && ( dobj . OpType == replication . ExistingObjectReplicationType || dobj . OpType == replication . HealReplicationType ) {
2021-09-18 16:31:35 -04:00
if _ , err := tgt . StatObject ( ctx , tgt . Bucket , dobj . ObjectName , miniogo . StatObjectOptions {
VersionID : versionID ,
Internal : miniogo . AdvancedGetOptions {
ReplicationProxyRequest : "false" ,
} } ) ; isErrMethodNotAllowed ( ErrorRespToObjectError ( err , dobj . Bucket , dobj . ObjectName ) ) {
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Completed
2021-12-16 18:34:55 -05:00
return
2021-09-18 16:31:35 -04:00
}
}
}
rmErr := tgt . RemoveObject ( ctx , tgt . Bucket , dobj . ObjectName , miniogo . RemoveObjectOptions {
VersionID : versionID ,
Internal : miniogo . AdvancedRemoveOptions {
ReplicationDeleteMarker : dobj . DeleteMarkerVersionID != "" ,
ReplicationMTime : dobj . DeleteMarkerMTime . Time ,
ReplicationStatus : miniogo . ReplicationStatusReplica ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
} ,
} )
if rmErr != nil {
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Failed
} else {
rinfo . VersionPurgeStatus = Failed
}
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate delete marker to %s/%s(%s): %s" , tgt . Bucket , dobj . ObjectName , versionID , rmErr ) )
} else {
if dobj . VersionID == "" {
rinfo . ReplicationStatus = replication . Completed
} else {
rinfo . VersionPurgeStatus = Complete
}
}
return
}
func getCopyObjMetadata ( oi ObjectInfo , sc string ) map [ string ] string {
2020-11-19 14:50:22 -05:00
meta := make ( map [ string ] string , len ( oi . UserDefined ) )
for k , v := range oi . UserDefined {
2021-02-03 23:41:33 -05:00
if strings . HasPrefix ( strings . ToLower ( k ) , ReservedMetadataPrefixLower ) {
2020-11-19 14:50:22 -05:00
continue
}
2021-02-03 23:41:33 -05:00
if equals ( k , xhttp . AmzBucketReplicationStatus ) {
continue
}
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
if equals ( k , xhttp . AmzMetaUnencryptedContentLength , xhttp . AmzMetaUnencryptedContentMD5 ) {
2020-11-19 14:50:22 -05:00
continue
}
meta [ k ] = v
}
2021-02-03 23:41:33 -05:00
2020-11-19 14:50:22 -05:00
if oi . ContentEncoding != "" {
meta [ xhttp . ContentEncoding ] = oi . ContentEncoding
}
2021-02-03 23:41:33 -05:00
2020-11-19 14:50:22 -05:00
if oi . ContentType != "" {
meta [ xhttp . ContentType ] = oi . ContentType
}
2021-02-03 23:41:33 -05:00
if oi . UserTags != "" {
meta [ xhttp . AmzObjectTagging ] = oi . UserTags
2020-11-19 14:50:22 -05:00
meta [ xhttp . AmzTagDirective ] = "REPLACE"
}
2021-02-03 23:41:33 -05:00
2020-11-19 14:50:22 -05:00
if sc == "" {
sc = oi . StorageClass
}
2021-04-19 13:30:42 -04:00
// drop non standard storage classes for tiering from replication
if sc != "" && ( sc == storageclass . RRS || sc == storageclass . STANDARD ) {
2021-02-03 23:41:33 -05:00
meta [ xhttp . AmzStorageClass ] = sc
2020-11-19 14:50:22 -05:00
}
2021-04-19 13:30:42 -04:00
2020-11-19 14:50:22 -05:00
meta [ xhttp . MinIOSourceETag ] = oi . ETag
2021-02-03 23:41:33 -05:00
meta [ xhttp . MinIOSourceMTime ] = oi . ModTime . Format ( time . RFC3339Nano )
2020-11-19 14:50:22 -05:00
meta [ xhttp . AmzBucketReplicationStatus ] = replication . Replica . String ( )
return meta
}
2021-02-08 21:12:28 -05:00
type caseInsensitiveMap map [ string ] string
// Lookup map entry case insensitively.
func ( m caseInsensitiveMap ) Lookup ( key string ) ( string , bool ) {
if len ( m ) == 0 {
return "" , false
}
for _ , k := range [ ] string {
key ,
strings . ToLower ( key ) ,
http . CanonicalHeaderKey ( key ) ,
} {
v , ok := m [ k ]
if ok {
return v , ok
}
2021-02-08 19:19:05 -05:00
}
2021-02-08 21:12:28 -05:00
return "" , false
2021-02-08 19:19:05 -05:00
}
2021-09-18 16:31:35 -04:00
func putReplicationOpts ( ctx context . Context , sc string , objInfo ObjectInfo ) ( putOpts miniogo . PutObjectOptions , err error ) {
2020-07-21 20:49:56 -04:00
meta := make ( map [ string ] string )
for k , v := range objInfo . UserDefined {
2021-01-12 01:36:51 -05:00
if strings . HasPrefix ( strings . ToLower ( k ) , ReservedMetadataPrefixLower ) {
2020-07-21 20:49:56 -04:00
continue
}
2021-01-12 01:36:51 -05:00
if isStandardHeader ( k ) {
2020-08-12 20:32:24 -04:00
continue
}
2020-07-21 20:49:56 -04:00
meta [ k ] = v
}
2021-02-03 23:41:33 -05:00
2021-04-19 13:30:42 -04:00
if sc == "" && ( objInfo . StorageClass == storageclass . STANDARD || objInfo . StorageClass == storageclass . RRS ) {
2020-08-05 23:01:20 -04:00
sc = objInfo . StorageClass
}
2020-07-21 20:49:56 -04:00
putOpts = miniogo . PutObjectOptions {
2020-10-06 11:37:09 -04:00
UserMetadata : meta ,
ContentType : objInfo . ContentType ,
ContentEncoding : objInfo . ContentEncoding ,
StorageClass : sc ,
Internal : miniogo . AdvancedPutOptions {
2021-03-03 14:13:31 -05:00
SourceVersionID : objInfo . VersionID ,
ReplicationStatus : miniogo . ReplicationStatusReplica ,
SourceMTime : objInfo . ModTime ,
SourceETag : objInfo . ETag ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
2020-10-06 11:37:09 -04:00
} ,
2020-07-21 20:49:56 -04:00
}
2021-02-03 23:41:33 -05:00
if objInfo . UserTags != "" {
tag , _ := tags . ParseObjectTags ( objInfo . UserTags )
if tag != nil {
putOpts . UserTags = tag . ToMap ( )
2021-09-18 16:31:35 -04:00
// set tag timestamp in opts
tagTimestamp := objInfo . ModTime
if tagTmstampStr , ok := objInfo . UserDefined [ ReservedMetadataPrefixLower + TaggingTimestamp ] ; ok {
tagTimestamp , err = time . Parse ( time . RFC3339Nano , tagTmstampStr )
if err != nil {
return putOpts , err
}
}
putOpts . Internal . TaggingTimestamp = tagTimestamp
2021-02-03 23:41:33 -05:00
}
}
2021-02-08 21:12:28 -05:00
lkMap := caseInsensitiveMap ( objInfo . UserDefined )
if lang , ok := lkMap . Lookup ( xhttp . ContentLanguage ) ; ok {
2021-01-27 14:22:34 -05:00
putOpts . ContentLanguage = lang
}
2021-02-08 21:12:28 -05:00
if disp , ok := lkMap . Lookup ( xhttp . ContentDisposition ) ; ok {
2021-01-27 14:22:34 -05:00
putOpts . ContentDisposition = disp
}
2021-02-08 21:12:28 -05:00
if cc , ok := lkMap . Lookup ( xhttp . CacheControl ) ; ok {
2021-01-27 14:22:34 -05:00
putOpts . CacheControl = cc
}
2021-02-08 21:12:28 -05:00
if mode , ok := lkMap . Lookup ( xhttp . AmzObjectLockMode ) ; ok {
2020-07-21 20:49:56 -04:00
rmode := miniogo . RetentionMode ( mode )
putOpts . Mode = rmode
}
2021-02-08 21:12:28 -05:00
if retainDateStr , ok := lkMap . Lookup ( xhttp . AmzObjectLockRetainUntilDate ) ; ok {
rdate , err := time . Parse ( time . RFC3339 , retainDateStr )
2020-07-21 20:49:56 -04:00
if err != nil {
2021-02-08 21:12:28 -05:00
return putOpts , err
2020-07-21 20:49:56 -04:00
}
putOpts . RetainUntilDate = rdate
2021-09-18 16:31:35 -04:00
// set retention timestamp in opts
retTimestamp := objInfo . ModTime
if retainTmstampStr , ok := objInfo . UserDefined [ ReservedMetadataPrefixLower + ObjectLockRetentionTimestamp ] ; ok {
retTimestamp , err = time . Parse ( time . RFC3339Nano , retainTmstampStr )
if err != nil {
return putOpts , err
}
}
putOpts . Internal . RetentionTimestamp = retTimestamp
2020-07-21 20:49:56 -04:00
}
2021-02-08 21:12:28 -05:00
if lhold , ok := lkMap . Lookup ( xhttp . AmzObjectLockLegalHold ) ; ok {
2020-07-21 20:49:56 -04:00
putOpts . LegalHold = miniogo . LegalHoldStatus ( lhold )
2021-09-18 16:31:35 -04:00
// set legalhold timestamp in opts
lholdTimestamp := objInfo . ModTime
if lholdTmstampStr , ok := objInfo . UserDefined [ ReservedMetadataPrefixLower + ObjectLockLegalHoldTimestamp ] ; ok {
lholdTimestamp , err = time . Parse ( time . RFC3339Nano , lholdTmstampStr )
if err != nil {
return putOpts , err
}
}
putOpts . Internal . LegalholdTimestamp = lholdTimestamp
2020-07-21 20:49:56 -04:00
}
if crypto . S3 . IsEncrypted ( objInfo . UserDefined ) {
putOpts . ServerSideEncryption = encrypt . NewSSE ( )
}
return
}
2020-11-19 14:50:22 -05:00
type replicationAction string
const (
replicateMetadata replicationAction = "metadata"
replicateNone replicationAction = "none"
replicateAll replicationAction = "all"
)
2021-02-03 23:41:33 -05:00
// matches k1 with all keys, returns 'true' if one of them matches
func equals ( k1 string , keys ... string ) bool {
for _ , k2 := range keys {
2021-11-18 15:15:22 -05:00
if strings . EqualFold ( k1 , k2 ) {
2021-02-03 23:41:33 -05:00
return true
}
}
return false
}
2020-11-19 14:50:22 -05:00
// returns replicationAction by comparing metadata between source and target
2021-09-28 13:26:12 -04:00
func getReplicationAction ( oi1 ObjectInfo , oi2 minio . ObjectInfo , opType replication . Type ) replicationAction {
// Avoid resyncing null versions created prior to enabling replication if target has a newer copy
if opType == replication . ExistingObjectReplicationType &&
oi1 . ModTime . Unix ( ) > oi2 . LastModified . Unix ( ) && oi1 . VersionID == nullVersionID {
return replicateNone
}
2020-11-19 14:50:22 -05:00
// needs full replication
if oi1 . ETag != oi2 . ETag ||
oi1 . VersionID != oi2 . VersionID ||
oi1 . Size != oi2 . Size ||
2021-01-27 14:22:34 -05:00
oi1 . DeleteMarker != oi2 . IsDeleteMarker ||
2021-02-03 23:41:33 -05:00
oi1 . ModTime . Unix ( ) != oi2 . LastModified . Unix ( ) {
2020-11-19 14:50:22 -05:00
return replicateAll
}
2021-02-03 23:41:33 -05:00
2021-01-27 14:22:34 -05:00
if oi1 . ContentType != oi2 . ContentType {
2020-11-19 14:50:22 -05:00
return replicateMetadata
}
2021-02-03 23:41:33 -05:00
2020-11-19 14:50:22 -05:00
if oi1 . ContentEncoding != "" {
2021-01-27 14:22:34 -05:00
enc , ok := oi2 . Metadata [ xhttp . ContentEncoding ]
2021-02-03 23:41:33 -05:00
if ! ok {
enc , ok = oi2 . Metadata [ strings . ToLower ( xhttp . ContentEncoding ) ]
if ! ok {
return replicateMetadata
}
}
if strings . Join ( enc , "," ) != oi1 . ContentEncoding {
2020-11-19 14:50:22 -05:00
return replicateMetadata
}
}
2021-02-03 23:41:33 -05:00
t , _ := tags . ParseObjectTags ( oi1 . UserTags )
if ! reflect . DeepEqual ( oi2 . UserTags , t . ToMap ( ) ) {
2020-11-19 14:50:22 -05:00
return replicateMetadata
}
2021-02-03 23:41:33 -05:00
// Compare only necessary headers
compareKeys := [ ] string {
"Expires" ,
"Cache-Control" ,
"Content-Language" ,
"Content-Disposition" ,
"X-Amz-Object-Lock-Mode" ,
"X-Amz-Object-Lock-Retain-Until-Date" ,
"X-Amz-Object-Lock-Legal-Hold" ,
"X-Amz-Website-Redirect-Location" ,
"X-Amz-Meta-" ,
}
// compare metadata on both maps to see if meta is identical
compareMeta1 := make ( map [ string ] string )
for k , v := range oi1 . UserDefined {
var found bool
for _ , prefix := range compareKeys {
if ! strings . HasPrefix ( strings . ToLower ( k ) , strings . ToLower ( prefix ) ) {
continue
}
found = true
break
}
if found {
compareMeta1 [ strings . ToLower ( k ) ] = v
2021-01-27 14:22:34 -05:00
}
}
2021-02-03 23:41:33 -05:00
compareMeta2 := make ( map [ string ] string )
for k , v := range oi2 . Metadata {
var found bool
for _ , prefix := range compareKeys {
if ! strings . HasPrefix ( strings . ToLower ( k ) , strings . ToLower ( prefix ) ) {
continue
}
found = true
break
2021-01-27 14:22:34 -05:00
}
2021-02-03 23:41:33 -05:00
if found {
compareMeta2 [ strings . ToLower ( k ) ] = strings . Join ( v , "," )
2020-11-19 14:50:22 -05:00
}
}
2021-02-03 23:41:33 -05:00
if ! reflect . DeepEqual ( compareMeta1 , compareMeta2 ) {
2020-11-19 14:50:22 -05:00
return replicateMetadata
}
2021-02-03 23:41:33 -05:00
2020-11-19 14:50:22 -05:00
return replicateNone
}
2020-07-21 20:49:56 -04:00
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
2021-07-01 17:02:44 -04:00
func replicateObject ( ctx context . Context , ri ReplicateObjectInfo , objectAPI ObjectLayer , trigger string ) {
var replicationStatus replication . StatusType
defer func ( ) {
if replicationStatus . Empty ( ) {
// replication status is empty means
// replication was not attempted for some
// reason, notify the state of the object
// on disk.
replicationStatus = ri . ReplicationStatus
}
auditLogInternal ( ctx , ri . Bucket , ri . Name , AuditLogOptions {
Trigger : trigger ,
APIName : ReplicateObjectAPI ,
VersionID : ri . VersionID ,
Status : replicationStatus . String ( ) ,
} )
} ( )
2021-04-15 19:32:00 -04:00
objInfo := ri . ObjectInfo
2020-09-16 19:04:55 -04:00
bucket := objInfo . Bucket
object := objInfo . Name
2020-07-30 22:55:22 -04:00
cfg , err := getReplicationConfig ( ctx , bucket )
2020-07-21 20:49:56 -04:00
if err != nil {
logger . LogIf ( ctx , err )
2021-02-03 23:41:33 -05:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2020-07-21 20:49:56 -04:00
return
}
2021-09-18 16:31:35 -04:00
tgtArns := cfg . FilterTargetArns ( replication . ObjectOpts {
Name : object ,
SSEC : crypto . SSEC . IsEncrypted ( objInfo . UserDefined ) ,
} )
// Lock the object name before starting replication.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI . NewNSLock ( bucket , "/[replicate]/" + object )
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
if err != nil {
2021-02-03 23:41:33 -05:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2021-09-18 16:31:35 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "failed to get lock for object: %s bucket:%s arn:%s" , object , bucket , cfg . RoleArn ) )
2020-07-21 20:49:56 -04:00
return
}
2021-09-18 16:31:35 -04:00
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
var wg sync . WaitGroup
var rinfos replicatedInfos
rinfos . Targets = make ( [ ] replicatedTargetInfo , len ( tgtArns ) )
for i , tgtArn := range tgtArns {
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgtArn )
if tgt == nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for bucket:%s arn:%s" , bucket , tgtArn ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
continue
}
wg . Add ( 1 )
go func ( index int , tgt * TargetClient ) {
defer wg . Done ( )
rinfos . Targets [ index ] = replicateObjectToTarget ( ctx , ri , objectAPI , tgt )
} ( i , tgt )
}
wg . Wait ( )
// FIXME: add support for missing replication events
// - event.ObjectReplicationMissedThreshold
// - event.ObjectReplicationReplicatedAfterThreshold
var eventName = event . ObjectReplicationComplete
if rinfos . ReplicationStatus ( ) == replication . Failed {
eventName = event . ObjectReplicationFailed
}
newReplStatusInternal := rinfos . ReplicationStatusInternal ( )
// Note that internal replication status(es) may match for previously replicated objects - in such cases
// metadata should be updated with last resync timestamp.
if objInfo . ReplicationStatusInternal != newReplStatusInternal || rinfos . ReplicationResynced ( ) {
popts := ObjectOptions {
2021-10-30 11:22:04 -04:00
MTime : objInfo . ModTime ,
VersionID : objInfo . VersionID ,
EvalMetadataFn : func ( oi ObjectInfo ) error {
oi . UserDefined [ ReservedMetadataPrefixLower + ReplicationStatus ] = newReplStatusInternal
oi . UserDefined [ ReservedMetadataPrefixLower + ReplicationTimestamp ] = UTCNow ( ) . Format ( time . RFC3339Nano )
oi . UserDefined [ xhttp . AmzBucketReplicationStatus ] = string ( rinfos . ReplicationStatus ( ) )
for _ , rinfo := range rinfos . Targets {
if rinfo . ResyncTimestamp != "" {
oi . UserDefined [ targetResetHeader ( rinfo . Arn ) ] = rinfo . ResyncTimestamp
}
}
if objInfo . UserTags != "" {
oi . UserDefined [ xhttp . AmzObjectTagging ] = objInfo . UserTags
}
return nil
} ,
2021-09-18 16:31:35 -04:00
}
2021-10-30 11:22:04 -04:00
2021-09-18 16:31:35 -04:00
if _ , err = objectAPI . PutObjectMetadata ( ctx , bucket , object , popts ) ; err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s(%s): %w" ,
bucket , objInfo . Name , objInfo . VersionID , err ) )
}
opType := replication . MetadataReplicationType
if rinfos . Action ( ) == replicateAll {
opType = replication . ObjectReplicationType
}
for _ , rinfo := range rinfos . Targets {
if rinfo . ReplicationStatus != rinfo . PrevReplicationStatus {
2021-11-17 15:10:57 -05:00
globalReplicationStats . Update ( bucket , rinfo . Arn , rinfo . Size , rinfo . Duration , rinfo . ReplicationStatus , rinfo . PrevReplicationStatus , opType )
2021-09-18 16:31:35 -04:00
}
}
}
sendEvent ( eventArgs {
EventName : eventName ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
// re-queue failures once more - keep a retry count to avoid flooding the queue if
// the target site is down. Leave it to scanner to catch up instead.
if rinfos . ReplicationStatus ( ) != replication . Completed && ri . RetryCount < 1 {
ri . OpType = replication . HealReplicationType
ri . ReplicationStatusInternal = rinfos . ReplicationStatusInternal ( )
ri . RetryCount ++
globalReplicationPool . queueReplicaFailedTask ( ri )
}
}
// replicateObjectToTarget replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObjectToTarget ( ctx context . Context , ri ReplicateObjectInfo , objectAPI ObjectLayer , tgt * TargetClient ) ( rinfo replicatedTargetInfo ) {
2021-11-17 15:10:57 -05:00
startTime := time . Now ( )
2021-09-22 13:48:45 -04:00
objInfo := ri . ObjectInfo . Clone ( )
2021-09-18 16:31:35 -04:00
bucket := objInfo . Bucket
object := objInfo . Name
var (
closeOnDefer bool
gr * GetObjectReader
size int64
err error
)
sz , _ := objInfo . GetActualSize ( )
// set defaults for replication action based on operation being performed - actual
// replication action can only be determined after stat on remote. This default is
// needed for updating replication metrics correctly when target is offline.
var rAction replicationAction
switch ri . OpType {
case replication . MetadataReplicationType :
rAction = replicateMetadata
default :
rAction = replicateAll
}
rinfo = replicatedTargetInfo {
Size : sz ,
Arn : tgt . ARN ,
PrevReplicationStatus : objInfo . TargetReplicationStatus ( tgt . ARN ) ,
ReplicationStatus : replication . Failed ,
OpType : ri . OpType ,
ReplicationAction : rAction ,
}
if ri . ObjectInfo . TargetReplicationStatus ( tgt . ARN ) == replication . Completed && ! ri . ExistingObjResync . Empty ( ) && ! ri . ExistingObjResync . mustResyncTarget ( tgt . ARN ) {
rinfo . ReplicationStatus = replication . Completed
rinfo . ReplicationResynced = true
2021-09-08 18:34:50 -04:00
return
}
2021-09-18 16:31:35 -04:00
if tgt . IsOffline ( ) {
logger . LogIf ( ctx , fmt . Errorf ( "remote target is offline for bucket:%s arn:%s" , bucket , tgt . ARN ) )
2021-08-23 11:16:18 -04:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
return
}
2021-09-18 16:31:35 -04:00
gr , err = objectAPI . GetObjectNInfo ( ctx , bucket , object , nil , http . Header { } , readLock , ObjectOptions {
2020-09-16 19:04:55 -04:00
VersionID : objInfo . VersionID ,
2020-09-15 23:44:48 -04:00
} )
2020-07-21 20:49:56 -04:00
if err != nil {
2021-02-03 23:41:33 -05:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2021-04-03 12:03:42 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replicate for %s/%s(%s): %w" , bucket , object , objInfo . VersionID , err ) )
2020-07-21 20:49:56 -04:00
return
}
2021-06-29 02:58:08 -04:00
defer func ( ) {
if closeOnDefer {
gr . Close ( )
}
} ( )
closeOnDefer = true
2021-02-03 23:41:33 -05:00
2020-09-16 19:04:55 -04:00
objInfo = gr . ObjInfo
2021-09-18 16:31:35 -04:00
size , err = objInfo . GetActualSize ( )
2020-07-21 20:49:56 -04:00
if err != nil {
logger . LogIf ( ctx , err )
2021-02-03 23:41:33 -05:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2020-07-21 20:49:56 -04:00
return
}
2021-09-18 16:31:35 -04:00
if tgt . Bucket == "" {
2021-02-03 23:41:33 -05:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate object %s(%s), bucket is empty" , objInfo . Name , objInfo . VersionID ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2021-09-18 16:31:35 -04:00
return rinfo
2020-07-21 20:49:56 -04:00
}
2020-09-16 19:04:55 -04:00
2021-09-18 16:31:35 -04:00
rAction = replicateAll
oi , cerr := tgt . StatObject ( ctx , tgt . Bucket , object , miniogo . StatObjectOptions {
2021-01-27 14:22:34 -05:00
VersionID : objInfo . VersionID ,
Internal : miniogo . AdvancedGetOptions {
ReplicationProxyRequest : "false" ,
} } )
2021-09-18 16:31:35 -04:00
if cerr == nil {
2021-09-28 13:26:12 -04:00
rAction = getReplicationAction ( objInfo , oi , ri . OpType )
2021-09-18 16:31:35 -04:00
rinfo . ReplicationStatus = replication . Completed
if rAction == replicateNone {
2021-09-28 13:26:12 -04:00
if ri . OpType == replication . ExistingObjectReplicationType &&
objInfo . ModTime . Unix ( ) > oi . LastModified . Unix ( ) && objInfo . VersionID == nullVersionID {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate %s/%s (null). Newer version exists on target" , bucket , object ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
}
2020-07-21 20:49:56 -04:00
// object with same VersionID already exists, replication kicked off by
2021-04-03 12:03:42 -04:00
// PutObject might have completed
2021-09-18 16:31:35 -04:00
if objInfo . TargetReplicationStatus ( tgt . ARN ) == replication . Pending || objInfo . TargetReplicationStatus ( tgt . ARN ) == replication . Failed || ri . OpType == replication . ExistingObjectReplicationType {
2021-07-01 17:02:44 -04:00
// if metadata is not updated for some reason after replication, such as
// 503 encountered while updating metadata - make sure to set ReplicationStatus
// as Completed.
//
// Note: Replication Stats would have been updated despite metadata update failure.
2021-06-29 02:58:08 -04:00
gr . Close ( )
closeOnDefer = false
2021-09-18 16:31:35 -04:00
return replicatedTargetInfo {
ReplicationStatus : replication . Completed ,
Size : sz ,
Arn : tgt . ARN ,
ReplicationAction : rAction ,
PrevReplicationStatus : objInfo . TargetReplicationStatus ( tgt . ARN ) ,
2021-04-29 19:46:26 -04:00
}
}
2020-07-21 20:49:56 -04:00
return
}
}
2021-09-18 16:31:35 -04:00
rinfo . ReplicationStatus = replication . Completed
rinfo . Size = size
rinfo . ReplicationAction = rAction
defer func ( ) {
if rinfo . ReplicationStatus == replication . Completed && ri . OpType == replication . ExistingObjectReplicationType && tgt . ResetID != "" {
rinfo . ResyncTimestamp = fmt . Sprintf ( "%s;%s" , UTCNow ( ) . Format ( http . TimeFormat ) , tgt . ResetID )
rinfo . ReplicationResynced = true
}
2021-11-17 15:10:57 -05:00
rinfo . Duration = time . Since ( startTime )
2021-09-18 16:31:35 -04:00
} ( )
2021-02-20 03:22:17 -05:00
// use core client to avoid doing multipart on PUT
c := & miniogo . Core { Client : tgt . Client }
2021-09-18 16:31:35 -04:00
if rAction != replicateAll {
2020-11-19 14:50:22 -05:00
// replicate metadata for object tagging/copy with metadata replacement
2021-02-10 20:25:04 -05:00
srcOpts := miniogo . CopySrcOptions {
2021-09-18 16:31:35 -04:00
Bucket : tgt . Bucket ,
2021-02-10 20:25:04 -05:00
Object : object ,
2021-04-03 12:03:42 -04:00
VersionID : objInfo . VersionID ,
}
2021-03-03 14:13:31 -05:00
dstOpts := miniogo . PutObjectOptions {
Internal : miniogo . AdvancedPutOptions {
SourceVersionID : objInfo . VersionID ,
ReplicationRequest : true , // always set this to distinguish between `mc mirror` replication and serverside
} }
2021-09-18 16:31:35 -04:00
if _ , err = c . CopyObject ( ctx , tgt . Bucket , object , tgt . Bucket , object , getCopyObjMetadata ( objInfo , tgt . StorageClass ) , srcOpts , dstOpts ) ; err != nil {
rinfo . ReplicationStatus = replication . Failed
2021-02-09 18:11:43 -05:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate metadata for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
2021-01-06 19:13:10 -05:00
}
} else {
2021-09-18 16:31:35 -04:00
var putOpts minio . PutObjectOptions
putOpts , err = putReplicationOpts ( ctx , tgt . StorageClass , objInfo )
2021-02-08 19:19:05 -05:00
if err != nil {
2021-09-18 16:31:35 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for replication bucket:%s err:%w" , bucket , err ) )
2021-02-08 21:12:28 -05:00
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
2021-02-08 19:19:05 -05:00
return
}
2021-01-06 19:13:10 -05:00
var headerSize int
for k , v := range putOpts . Header ( ) {
headerSize += len ( k ) + len ( v )
}
2021-01-08 13:12:26 -05:00
2021-04-05 19:07:53 -04:00
opts := & bandwidth . MonitorReaderOptions {
2021-06-24 21:29:30 -04:00
Bucket : objInfo . Bucket ,
HeaderSize : headerSize ,
2021-04-05 19:07:53 -04:00
}
2021-07-28 18:20:01 -04:00
newCtx := ctx
if globalBucketMonitor . IsThrottled ( bucket ) {
var cancel context . CancelFunc
newCtx , cancel = context . WithTimeout ( ctx , throttleDeadline )
defer cancel ( )
}
2021-06-24 21:29:30 -04:00
r := bandwidth . NewMonitoredReader ( newCtx , globalBucketMonitor , gr , opts )
2021-09-09 01:25:23 -04:00
if objInfo . isMultipart ( ) {
2021-09-18 16:31:35 -04:00
if err := replicateObjectWithMultipart ( ctx , c , tgt . Bucket , object ,
2021-07-29 01:11:55 -04:00
r , objInfo , putOpts ) ; err != nil {
2021-09-18 16:31:35 -04:00
rinfo . ReplicationStatus = replication . Failed
2021-06-30 10:44:24 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
}
} else {
2021-09-18 16:31:35 -04:00
if _ , err = c . PutObject ( ctx , tgt . Bucket , object , r , size , "" , "" , putOpts ) ; err != nil {
rinfo . ReplicationStatus = replication . Failed
2021-06-30 10:44:24 -04:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
}
2021-01-06 19:13:10 -05:00
}
2020-07-21 20:49:56 -04:00
}
2021-06-29 02:58:08 -04:00
gr . Close ( )
closeOnDefer = false
2021-09-18 16:31:35 -04:00
return
2020-07-21 20:49:56 -04:00
}
2020-08-12 20:32:24 -04:00
2021-07-29 01:11:55 -04:00
func replicateObjectWithMultipart ( ctx context . Context , c * miniogo . Core , bucket , object string , r io . Reader , objInfo ObjectInfo , opts miniogo . PutObjectOptions ) ( err error ) {
2021-06-30 10:44:24 -04:00
var uploadedParts [ ] miniogo . CompletePart
2021-07-29 01:11:55 -04:00
uploadID , err := c . NewMultipartUpload ( context . Background ( ) , bucket , object , opts )
2021-06-30 10:44:24 -04:00
if err != nil {
2021-07-29 01:11:55 -04:00
return err
2021-06-30 10:44:24 -04:00
}
2021-07-29 01:11:55 -04:00
defer func ( ) {
if err != nil {
// block and abort remote upload upon failure.
if aerr := c . AbortMultipartUpload ( ctx , bucket , object , uploadID ) ; aerr != nil {
aerr = fmt . Errorf ( "Unable to cleanup failed multipart replication %s on remote %s/%s: %w" , uploadID , bucket , object , aerr )
logger . LogIf ( ctx , aerr )
}
}
} ( )
2021-06-30 10:44:24 -04:00
var (
hr * hash . Reader
pInfo miniogo . ObjectPart
)
2021-07-29 01:11:55 -04:00
2021-06-30 10:44:24 -04:00
for _ , partInfo := range objInfo . Parts {
2021-08-24 17:41:05 -04:00
hr , err = hash . NewReader ( r , partInfo . ActualSize , "" , "" , partInfo . ActualSize )
2021-06-30 10:44:24 -04:00
if err != nil {
2021-07-29 01:11:55 -04:00
return err
2021-06-30 10:44:24 -04:00
}
2021-08-24 17:41:05 -04:00
pInfo , err = c . PutObjectPart ( ctx , bucket , object , uploadID , partInfo . Number , hr , partInfo . ActualSize , "" , "" , opts . ServerSideEncryption )
2021-06-30 10:44:24 -04:00
if err != nil {
2021-07-29 01:11:55 -04:00
return err
2021-06-30 10:44:24 -04:00
}
2021-08-24 17:41:05 -04:00
if pInfo . Size != partInfo . ActualSize {
return fmt . Errorf ( "Part size mismatch: got %d, want %d" , pInfo . Size , partInfo . ActualSize )
2021-06-30 10:44:24 -04:00
}
uploadedParts = append ( uploadedParts , miniogo . CompletePart {
PartNumber : pInfo . PartNumber ,
ETag : pInfo . ETag ,
} )
}
2021-07-29 01:11:55 -04:00
_ , err = c . CompleteMultipartUpload ( ctx , bucket , object , uploadID , uploadedParts , miniogo . PutObjectOptions {
Internal : miniogo . AdvancedPutOptions {
SourceMTime : objInfo . ModTime ,
// always set this to distinguish between `mc mirror` replication and serverside
ReplicationRequest : true ,
} } )
return err
2021-06-30 10:44:24 -04:00
}
2020-08-12 20:32:24 -04:00
// filterReplicationStatusMetadata filters replication status metadata for COPY
func filterReplicationStatusMetadata ( metadata map [ string ] string ) map [ string ] string {
// Copy on write
dst := metadata
var copied bool
delKey := func ( key string ) {
if _ , ok := metadata [ key ] ; ! ok {
return
}
if ! copied {
dst = make ( map [ string ] string , len ( metadata ) )
for k , v := range metadata {
dst [ k ] = v
}
copied = true
}
delete ( dst , key )
}
delKey ( xhttp . AmzBucketReplicationStatus )
return dst
}
2020-09-16 19:04:55 -04:00
2021-06-01 22:59:11 -04:00
// DeletedObjectReplicationInfo has info on deleted object
type DeletedObjectReplicationInfo struct {
2020-11-19 21:43:58 -05:00
DeletedObject
2021-09-18 16:31:35 -04:00
Bucket string
OpType replication . Type
ResetID string
TargetArn string
2020-11-19 21:43:58 -05:00
}
2021-07-01 17:02:44 -04:00
// Replication specific APIName
const (
ReplicateObjectAPI = "ReplicateObject"
ReplicateDeleteAPI = "ReplicateDelete"
)
2021-06-29 02:58:08 -04:00
const (
2021-07-01 17:02:44 -04:00
// ReplicateQueued - replication being queued trail
ReplicateQueued = "replicate:queue"
// ReplicateExisting - audit trail for existing objects replication
ReplicateExisting = "replicate:existing"
// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
ReplicateExistingDelete = "replicate:existing:delete"
// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
ReplicateMRF = "replicate:mrf"
// ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity]
ReplicateIncoming = "replicate:incoming"
// ReplicateHeal - audit trail for healing of failed/pending replications
ReplicateHeal = "replicate:heal"
// ReplicateDelete - audit trail for delete replication
ReplicateDelete = "replicate:delete"
2021-06-29 02:58:08 -04:00
)
2020-09-21 16:43:29 -04:00
var (
2021-04-03 12:03:42 -04:00
globalReplicationPool * ReplicationPool
globalReplicationStats * ReplicationStats
2020-09-21 16:43:29 -04:00
)
2020-09-16 19:04:55 -04:00
2021-03-09 05:56:42 -05:00
// ReplicationPool describes replication pool
type ReplicationPool struct {
2021-06-01 22:59:11 -04:00
objLayer ObjectLayer
ctx context . Context
mrfWorkerKillCh chan struct { }
workerKillCh chan struct { }
replicaCh chan ReplicateObjectInfo
replicaDeleteCh chan DeletedObjectReplicationInfo
mrfReplicaCh chan ReplicateObjectInfo
existingReplicaCh chan ReplicateObjectInfo
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
workerSize int
mrfWorkerSize int
workerWg sync . WaitGroup
mrfWorkerWg sync . WaitGroup
once sync . Once
mu sync . Mutex
2021-03-09 05:56:42 -05:00
}
// NewReplicationPool creates a pool of replication workers of specified size
2021-04-24 00:58:45 -04:00
func NewReplicationPool ( ctx context . Context , o ObjectLayer , opts replicationPoolOpts ) * ReplicationPool {
2021-03-09 05:56:42 -05:00
pool := & ReplicationPool {
2021-06-01 22:59:11 -04:00
replicaCh : make ( chan ReplicateObjectInfo , 100000 ) ,
replicaDeleteCh : make ( chan DeletedObjectReplicationInfo , 100000 ) ,
mrfReplicaCh : make ( chan ReplicateObjectInfo , 100000 ) ,
workerKillCh : make ( chan struct { } , opts . Workers ) ,
mrfWorkerKillCh : make ( chan struct { } , opts . FailedWorkers ) ,
existingReplicaCh : make ( chan ReplicateObjectInfo , 100000 ) ,
existingReplicaDeleteCh : make ( chan DeletedObjectReplicationInfo , 100000 ) ,
ctx : ctx ,
objLayer : o ,
2021-04-03 12:03:42 -04:00
}
2021-05-28 16:28:37 -04:00
2021-04-24 00:58:45 -04:00
pool . ResizeWorkers ( opts . Workers )
pool . ResizeFailedWorkers ( opts . FailedWorkers )
2021-06-01 22:59:11 -04:00
go pool . AddExistingObjectReplicateWorker ( )
2021-03-09 05:56:42 -05:00
return pool
2020-09-16 19:04:55 -04:00
}
2021-04-03 12:03:42 -04:00
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
// to the other workers
func ( p * ReplicationPool ) AddMRFWorker ( ) {
for {
select {
case <- p . ctx . Done ( ) :
return
case oi , ok := <- p . mrfReplicaCh :
if ! ok {
return
}
2021-07-01 17:02:44 -04:00
replicateObject ( p . ctx , oi , p . objLayer , ReplicateMRF )
2021-05-28 16:28:37 -04:00
case <- p . mrfWorkerKillCh :
return
2021-04-03 12:03:42 -04:00
}
}
}
2021-03-09 05:56:42 -05:00
// AddWorker adds a replication worker to the pool
func ( p * ReplicationPool ) AddWorker ( ) {
2021-04-24 00:58:45 -04:00
defer p . workerWg . Done ( )
2021-03-09 05:56:42 -05:00
for {
select {
case <- p . ctx . Done ( ) :
return
case oi , ok := <- p . replicaCh :
if ! ok {
2020-09-16 19:04:55 -04:00
return
}
2021-07-01 17:02:44 -04:00
replicateObject ( p . ctx , oi , p . objLayer , ReplicateIncoming )
2021-03-09 05:56:42 -05:00
case doi , ok := <- p . replicaDeleteCh :
if ! ok {
return
}
2021-07-01 17:02:44 -04:00
replicateDelete ( p . ctx , doi , p . objLayer , ReplicateDelete )
2021-04-24 00:58:45 -04:00
case <- p . workerKillCh :
2021-03-09 05:56:42 -05:00
return
2020-09-16 19:04:55 -04:00
}
2021-03-09 05:56:42 -05:00
}
2020-09-16 19:04:55 -04:00
}
2020-09-21 16:43:29 -04:00
2021-06-01 22:59:11 -04:00
// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd
func ( p * ReplicationPool ) AddExistingObjectReplicateWorker ( ) {
for {
select {
case <- p . ctx . Done ( ) :
return
case oi , ok := <- p . existingReplicaCh :
if ! ok {
return
}
2021-07-01 17:02:44 -04:00
replicateObject ( p . ctx , oi , p . objLayer , ReplicateExisting )
2021-06-01 22:59:11 -04:00
case doi , ok := <- p . existingReplicaDeleteCh :
if ! ok {
return
}
2021-07-01 17:02:44 -04:00
replicateDelete ( p . ctx , doi , p . objLayer , ReplicateExistingDelete )
2021-06-01 22:59:11 -04:00
}
}
}
2021-04-24 00:58:45 -04:00
// ResizeWorkers sets replication workers pool to new size
func ( p * ReplicationPool ) ResizeWorkers ( n int ) {
2021-03-09 05:56:42 -05:00
p . mu . Lock ( )
defer p . mu . Unlock ( )
2021-04-24 00:58:45 -04:00
for p . workerSize < n {
p . workerSize ++
p . workerWg . Add ( 1 )
2021-03-09 05:56:42 -05:00
go p . AddWorker ( )
}
2021-04-24 00:58:45 -04:00
for p . workerSize > n {
p . workerSize --
go func ( ) { p . workerKillCh <- struct { } { } } ( )
}
}
// ResizeFailedWorkers sets replication failed workers pool size
func ( p * ReplicationPool ) ResizeFailedWorkers ( n int ) {
p . mu . Lock ( )
defer p . mu . Unlock ( )
for p . mrfWorkerSize < n {
p . mrfWorkerSize ++
p . mrfWorkerWg . Add ( 1 )
go p . AddMRFWorker ( )
}
for p . mrfWorkerSize > n {
p . mrfWorkerSize --
go func ( ) { p . mrfWorkerKillCh <- struct { } { } } ( )
2021-03-09 05:56:42 -05:00
}
}
2021-12-13 21:22:56 -05:00
// suggestedWorkers recommends an increase in number of workers to meet replication load.
func ( p * ReplicationPool ) suggestedWorkers ( failQueue bool ) int {
if failQueue {
return int ( float64 ( p . mrfWorkerSize ) * ReplicationWorkerMultiplier )
}
return int ( float64 ( p . workerSize ) * ReplicationWorkerMultiplier )
}
2021-04-29 21:20:39 -04:00
func ( p * ReplicationPool ) queueReplicaFailedTask ( ri ReplicateObjectInfo ) {
2021-03-09 05:56:42 -05:00
if p == nil {
2020-09-21 16:43:29 -04:00
return
}
2021-03-09 05:56:42 -05:00
select {
2021-04-29 21:20:39 -04:00
case <- GlobalContext . Done ( ) :
2021-04-16 17:09:25 -04:00
p . once . Do ( func ( ) {
close ( p . replicaCh )
close ( p . mrfReplicaCh )
2021-06-01 22:59:11 -04:00
close ( p . existingReplicaCh )
2021-04-16 17:09:25 -04:00
} )
2021-04-15 19:32:00 -04:00
case p . mrfReplicaCh <- ri :
2021-03-09 05:56:42 -05:00
default :
2021-12-13 21:22:56 -05:00
logger . LogOnceIf ( GlobalContext , fmt . Errorf ( "WARNING: Replication failed workers could not keep up with healing failures - consider increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`" , p . suggestedWorkers ( true ) ) , replicationSubsystem )
2021-03-09 05:56:42 -05:00
}
}
2020-09-21 16:43:29 -04:00
2021-04-29 21:20:39 -04:00
func ( p * ReplicationPool ) queueReplicaTask ( ri ReplicateObjectInfo ) {
if p == nil {
return
}
2021-06-01 22:59:11 -04:00
var ch chan ReplicateObjectInfo
switch ri . OpType {
case replication . ExistingObjectReplicationType :
ch = p . existingReplicaCh
2021-06-29 02:58:08 -04:00
case replication . HealReplicationType :
2021-07-01 17:02:44 -04:00
fallthrough
2021-06-01 22:59:11 -04:00
default :
ch = p . replicaCh
}
2021-04-29 21:20:39 -04:00
select {
case <- GlobalContext . Done ( ) :
p . once . Do ( func ( ) {
close ( p . replicaCh )
close ( p . mrfReplicaCh )
2021-06-01 22:59:11 -04:00
close ( p . existingReplicaCh )
2021-04-29 21:20:39 -04:00
} )
2021-06-01 22:59:11 -04:00
case ch <- ri :
2021-04-29 21:20:39 -04:00
default :
2021-12-13 21:22:56 -05:00
logger . LogOnceIf ( GlobalContext , fmt . Errorf ( "WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`" , p . suggestedWorkers ( false ) ) , replicationSubsystem )
2021-04-29 21:20:39 -04:00
}
}
2021-09-18 16:31:35 -04:00
func queueReplicateDeletesWrapper ( doi DeletedObjectReplicationInfo , existingObjectResync ResyncDecision ) {
for k , v := range existingObjectResync . targets {
if v . Replicate {
doi . ResetID = v . ResetID
doi . TargetArn = k
globalReplicationPool . queueReplicaDeleteTask ( doi )
}
}
}
2021-06-01 22:59:11 -04:00
func ( p * ReplicationPool ) queueReplicaDeleteTask ( doi DeletedObjectReplicationInfo ) {
2021-03-09 05:56:42 -05:00
if p == nil {
return
2020-09-21 16:43:29 -04:00
}
2021-06-01 22:59:11 -04:00
var ch chan DeletedObjectReplicationInfo
switch doi . OpType {
case replication . ExistingObjectReplicationType :
ch = p . existingReplicaDeleteCh
2021-06-29 02:58:08 -04:00
case replication . HealReplicationType :
2021-07-01 17:02:44 -04:00
fallthrough
2021-06-01 22:59:11 -04:00
default :
ch = p . replicaDeleteCh
}
2021-03-09 05:56:42 -05:00
select {
2021-04-29 21:20:39 -04:00
case <- GlobalContext . Done ( ) :
2021-04-16 17:09:25 -04:00
p . once . Do ( func ( ) {
close ( p . replicaDeleteCh )
2021-06-01 22:59:11 -04:00
close ( p . existingReplicaDeleteCh )
2021-04-16 17:09:25 -04:00
} )
2021-06-01 22:59:11 -04:00
case ch <- doi :
2021-03-09 05:56:42 -05:00
default :
2021-12-13 21:22:56 -05:00
logger . LogOnceIf ( GlobalContext , fmt . Errorf ( "WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`" , p . suggestedWorkers ( false ) ) , replicationSubsystem )
2021-03-09 05:56:42 -05:00
}
}
2021-04-24 00:58:45 -04:00
type replicationPoolOpts struct {
Workers int
FailedWorkers int
}
2021-03-09 05:56:42 -05:00
func initBackgroundReplication ( ctx context . Context , objectAPI ObjectLayer ) {
2021-04-24 00:58:45 -04:00
globalReplicationPool = NewReplicationPool ( ctx , objectAPI , replicationPoolOpts {
Workers : globalAPIConfig . getReplicationWorkers ( ) ,
FailedWorkers : globalAPIConfig . getReplicationFailedWorkers ( ) ,
} )
2021-04-03 12:03:42 -04:00
globalReplicationStats = NewReplicationStats ( ctx , objectAPI )
2021-10-21 21:52:55 -04:00
go globalReplicationStats . loadInitialReplicationMetrics ( ctx )
2020-09-21 16:43:29 -04:00
}
2021-01-12 01:36:51 -05:00
// get Reader from replication target if active-active replication is in place and
// this node returns a 404
2021-09-18 16:31:35 -04:00
func proxyGetToReplicationTarget ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , opts ObjectOptions , proxyTargets * madmin . BucketTargets ) ( gr * GetObjectReader , proxy bool ) {
tgt , oi , proxy := proxyHeadToRepTarget ( ctx , bucket , object , opts , proxyTargets )
if ! proxy {
2021-01-12 01:36:51 -05:00
return nil , false
}
fn , off , length , err := NewGetObjectReader ( rs , oi , opts )
if err != nil {
return nil , false
}
gopts := miniogo . GetObjectOptions {
VersionID : opts . VersionID ,
ServerSideEncryption : opts . ServerSideEncryption ,
Internal : miniogo . AdvancedGetOptions {
2021-01-27 14:22:34 -05:00
ReplicationProxyRequest : "true" ,
2021-01-12 01:36:51 -05:00
} ,
}
// get correct offsets for encrypted object
if off >= 0 && length >= 0 {
if err := gopts . SetRange ( off , off + length - 1 ) ; err != nil {
return nil , false
}
}
2021-02-03 23:41:33 -05:00
// Make sure to match ETag when proxying.
if err = gopts . SetMatchETag ( oi . ETag ) ; err != nil {
return nil , false
}
2021-01-12 01:36:51 -05:00
c := miniogo . Core { Client : tgt . Client }
obj , _ , _ , err := c . GetObject ( ctx , bucket , object , gopts )
if err != nil {
return nil , false
}
closeReader := func ( ) { obj . Close ( ) }
2021-06-24 12:44:00 -04:00
reader , err := fn ( obj , h , closeReader )
2021-01-12 01:36:51 -05:00
if err != nil {
return nil , false
}
2021-02-10 20:25:04 -05:00
reader . ObjInfo = oi . Clone ( )
2021-01-12 01:36:51 -05:00
return reader , true
}
2021-09-18 16:31:35 -04:00
func getproxyTargets ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( tgts * madmin . BucketTargets ) {
2021-01-12 01:36:51 -05:00
cfg , err := getReplicationConfig ( ctx , bucket )
2021-09-18 16:31:35 -04:00
if err != nil || cfg == nil {
return & madmin . BucketTargets { }
}
topts := replication . ObjectOpts { Name : object }
tgtArns := cfg . FilterTargetArns ( topts )
tgts = & madmin . BucketTargets { Targets : make ( [ ] madmin . BucketTarget , len ( tgtArns ) ) }
for i , tgtArn := range tgtArns {
tgt := globalBucketTargetSys . GetRemoteBucketTargetByArn ( ctx , bucket , tgtArn )
tgts . Targets [ i ] = tgt
2021-01-12 01:36:51 -05:00
}
2021-09-18 16:31:35 -04:00
return tgts
2021-01-12 01:36:51 -05:00
}
2021-01-27 14:22:34 -05:00
2021-09-18 16:31:35 -04:00
func proxyHeadToRepTarget ( ctx context . Context , bucket , object string , opts ObjectOptions , proxyTargets * madmin . BucketTargets ) ( tgt * TargetClient , oi ObjectInfo , proxy bool ) {
2021-01-12 01:36:51 -05:00
// this option is set when active-active replication is in place between site A -> B,
// and site B does not have the object yet.
2021-01-27 14:22:34 -05:00
if opts . ProxyRequest || ( opts . ProxyHeaderSet && ! opts . ProxyRequest ) { // true only when site B sets MinIOSourceProxyRequest header
2021-09-18 16:31:35 -04:00
return nil , oi , false
2021-01-12 01:36:51 -05:00
}
2021-09-18 16:31:35 -04:00
for _ , t := range proxyTargets . Targets {
tgt = globalBucketTargetSys . GetRemoteTargetClient ( ctx , t . Arn )
if tgt == nil || tgt . IsOffline ( ) {
continue
}
// if proxying explicitly disabled on remote target
if tgt . disableProxy {
continue
}
2021-01-12 01:36:51 -05:00
2021-09-18 16:31:35 -04:00
gopts := miniogo . GetObjectOptions {
VersionID : opts . VersionID ,
ServerSideEncryption : opts . ServerSideEncryption ,
Internal : miniogo . AdvancedGetOptions {
ReplicationProxyRequest : "true" ,
} ,
}
objInfo , err := tgt . StatObject ( ctx , t . TargetBucket , object , gopts )
if err != nil {
continue
}
tags , _ := tags . MapToObjectTags ( objInfo . UserTags )
oi = ObjectInfo {
Bucket : bucket ,
Name : object ,
ModTime : objInfo . LastModified ,
Size : objInfo . Size ,
ETag : objInfo . ETag ,
VersionID : objInfo . VersionID ,
IsLatest : objInfo . IsLatest ,
DeleteMarker : objInfo . IsDeleteMarker ,
ContentType : objInfo . ContentType ,
Expires : objInfo . Expires ,
StorageClass : objInfo . StorageClass ,
ReplicationStatusInternal : objInfo . ReplicationStatus ,
UserTags : tags . String ( ) ,
}
oi . UserDefined = make ( map [ string ] string , len ( objInfo . Metadata ) )
for k , v := range objInfo . Metadata {
oi . UserDefined [ k ] = v [ 0 ]
}
ce , ok := oi . UserDefined [ xhttp . ContentEncoding ]
if ! ok {
ce , ok = oi . UserDefined [ strings . ToLower ( xhttp . ContentEncoding ) ]
}
if ok {
oi . ContentEncoding = ce
}
return tgt , oi , true
2021-01-12 01:36:51 -05:00
}
2021-09-18 16:31:35 -04:00
return nil , oi , false
2021-01-12 01:36:51 -05:00
}
// get object info from replication target if active-active replication is in place and
// this node returns a 404
2021-09-18 16:31:35 -04:00
func proxyHeadToReplicationTarget ( ctx context . Context , bucket , object string , opts ObjectOptions , proxyTargets * madmin . BucketTargets ) ( oi ObjectInfo , proxy bool ) {
_ , oi , proxy = proxyHeadToRepTarget ( ctx , bucket , object , opts , proxyTargets )
return oi , proxy
2021-01-12 01:36:51 -05:00
}
2021-09-18 16:31:35 -04:00
func scheduleReplication ( ctx context . Context , objInfo ObjectInfo , o ObjectLayer , dsc ReplicateDecision , opType replication . Type ) {
if dsc . Synchronous ( ) {
replicateObject ( ctx , ReplicateObjectInfo { ObjectInfo : objInfo , OpType : opType , Dsc : dsc } , o , ReplicateIncoming )
2021-01-12 01:36:51 -05:00
} else {
2021-09-18 16:31:35 -04:00
globalReplicationPool . queueReplicaTask ( ReplicateObjectInfo { ObjectInfo : objInfo , OpType : opType , Dsc : dsc } )
2021-04-03 12:03:42 -04:00
}
if sz , err := objInfo . GetActualSize ( ) ; err == nil {
2021-09-18 16:31:35 -04:00
for arn := range dsc . targetsMap {
2021-11-17 15:10:57 -05:00
globalReplicationStats . Update ( objInfo . Bucket , arn , sz , 0 , objInfo . ReplicationStatus , replication . StatusType ( "" ) , opType )
2021-09-18 16:31:35 -04:00
}
2021-01-12 01:36:51 -05:00
}
}
2021-09-18 16:31:35 -04:00
func scheduleReplicationDelete ( ctx context . Context , dv DeletedObjectReplicationInfo , o ObjectLayer ) {
2021-04-29 21:20:39 -04:00
globalReplicationPool . queueReplicaDeleteTask ( dv )
2021-09-18 16:31:35 -04:00
for arn := range dv . ReplicationState . Targets {
2021-11-17 15:10:57 -05:00
globalReplicationStats . Update ( dv . Bucket , arn , 0 , 0 , replication . Pending , replication . StatusType ( "" ) , replication . DeleteReplicationType )
2021-09-18 16:31:35 -04:00
}
for arn := range dv . ReplicationState . PurgeTargets {
2021-11-17 15:10:57 -05:00
globalReplicationStats . Update ( dv . Bucket , arn , 0 , 0 , replication . Pending , replication . StatusType ( "" ) , replication . DeleteReplicationType )
2021-09-18 16:31:35 -04:00
}
2021-01-12 01:36:51 -05:00
}
2021-06-01 22:59:11 -04:00
type replicationConfig struct {
2021-09-18 16:31:35 -04:00
Config * replication . Config
remotes * madmin . BucketTargets
2021-06-01 22:59:11 -04:00
}
func ( c replicationConfig ) Empty ( ) bool {
return c . Config == nil
}
func ( c replicationConfig ) Replicate ( opts replication . ObjectOpts ) bool {
return c . Config . Replicate ( opts )
}
// Resync returns true if replication reset is requested
2021-09-18 16:31:35 -04:00
func ( c replicationConfig ) Resync ( ctx context . Context , oi ObjectInfo , dsc * ReplicateDecision , tgtStatuses map [ string ] replication . StatusType ) ( r ResyncDecision ) {
2021-06-01 22:59:11 -04:00
if c . Empty ( ) {
2021-09-18 16:31:35 -04:00
return
2021-06-01 22:59:11 -04:00
}
// existing object replication does not apply to un-versioned objects
2021-09-28 13:26:12 -04:00
if oi . VersionID == "" {
2021-09-18 16:31:35 -04:00
return
2021-06-01 22:59:11 -04:00
}
2021-09-18 16:31:35 -04:00
// Now overlay existing object replication choices for target
2021-06-01 22:59:11 -04:00
if oi . DeleteMarker {
2021-09-18 16:31:35 -04:00
opts := replication . ObjectOpts {
2021-06-01 22:59:11 -04:00
Name : oi . Name ,
SSEC : crypto . SSEC . IsEncrypted ( oi . UserDefined ) ,
UserTags : oi . UserTags ,
DeleteMarker : oi . DeleteMarker ,
VersionID : oi . VersionID ,
OpType : replication . DeleteReplicationType ,
2021-09-18 16:31:35 -04:00
ExistingObject : true }
tgtArns := c . Config . FilterTargetArns ( opts )
// indicates no matching target with Existing object replication enabled.
if len ( tgtArns ) == 0 {
return
2021-06-01 22:59:11 -04:00
}
2021-09-18 16:31:35 -04:00
for _ , t := range tgtArns {
opts . TargetArn = t
// Update replication decision for target based on existing object replciation rule.
dsc . Set ( newReplicateTargetDecision ( t , c . Replicate ( opts ) , false ) )
}
return c . resync ( oi , dsc , tgtStatuses )
}
// Ignore previous replication status when deciding if object can be re-replicated
objInfo := oi . Clone ( )
objInfo . ReplicationStatusInternal = ""
objInfo . VersionPurgeStatusInternal = ""
objInfo . ReplicationStatus = ""
objInfo . VersionPurgeStatus = ""
resyncdsc := mustReplicate ( ctx , oi . Bucket , oi . Name , getMustReplicateOptions ( objInfo , replication . ExistingObjectReplicationType , ObjectOptions { } ) )
dsc = & resyncdsc
return c . resync ( oi , dsc , tgtStatuses )
2021-06-01 22:59:11 -04:00
}
// wrapper function for testability. Returns true if a new reset is requested on
// already replicated objects OR object qualifies for existing object replication
// and no reset requested.
2021-09-18 16:31:35 -04:00
func ( c replicationConfig ) resync ( oi ObjectInfo , dsc * ReplicateDecision , tgtStatuses map [ string ] replication . StatusType ) ( r ResyncDecision ) {
r = ResyncDecision {
targets : make ( map [ string ] ResyncTargetDecision ) ,
}
if c . remotes == nil {
return
}
for _ , tgt := range c . remotes . Targets {
d , ok := dsc . targetsMap [ tgt . Arn ]
if ! ok {
continue
}
if ! d . Replicate {
continue
}
r . targets [ d . Arn ] = resyncTarget ( oi , tgt . Arn , tgt . ResetID , tgt . ResetBeforeDate , tgtStatuses [ tgt . Arn ] )
}
return
}
func targetResetHeader ( arn string ) string {
return fmt . Sprintf ( "%s-%s" , ReservedMetadataPrefixLower + ReplicationReset , arn )
}
func resyncTarget ( oi ObjectInfo , arn string , resetID string , resetBeforeDate time . Time , tgtStatus replication . StatusType ) ( rd ResyncTargetDecision ) {
rd = ResyncTargetDecision {
ResetID : resetID ,
ResetBeforeDate : resetBeforeDate ,
}
rs , ok := oi . UserDefined [ targetResetHeader ( arn ) ]
if ! ok {
2021-11-16 12:28:29 -05:00
rs , ok = oi . UserDefined [ xhttp . MinIOReplicationResetStatus ] // for backward compatibility
2021-06-01 22:59:11 -04:00
}
if ! ok { // existing object replication is enabled and object version is unreplicated so far.
2021-09-18 16:31:35 -04:00
if resetID != "" && oi . ModTime . Before ( resetBeforeDate ) { // trigger replication if `mc replicate reset` requested
rd . Replicate = true
return
2021-06-01 22:59:11 -04:00
}
2021-09-18 16:31:35 -04:00
// For existing object reset - this condition is needed
rd . Replicate = tgtStatus == ""
return
2021-06-01 22:59:11 -04:00
}
2021-09-18 16:31:35 -04:00
if resetID == "" || resetBeforeDate . Equal ( timeSentinel ) { // no reset in progress
return
2021-06-01 22:59:11 -04:00
}
2021-09-18 16:31:35 -04:00
2021-06-01 22:59:11 -04:00
// if already replicated, return true if a new reset was requested.
splits := strings . SplitN ( rs , ";" , 2 )
2021-09-18 16:31:35 -04:00
if len ( splits ) != 2 {
return
}
newReset := splits [ 1 ] != resetID
if ! newReset && tgtStatus == replication . Completed {
2021-06-01 22:59:11 -04:00
// already replicated and no reset requested
2021-09-18 16:31:35 -04:00
return
2021-06-01 22:59:11 -04:00
}
2021-09-18 16:31:35 -04:00
rd . Replicate = newReset && oi . ModTime . Before ( resetBeforeDate )
return
2021-06-01 22:59:11 -04:00
}
2021-11-19 17:46:14 -05:00
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats ( bucket string , u BucketUsageInfo ) ( s BucketReplicationStats ) {
bucketStats := globalNotificationSys . GetClusterBucketStats ( GlobalContext , bucket )
// accumulate cluster bucket stats
stats := make ( map [ string ] * BucketReplicationStat )
var totReplicaSize int64
for _ , bucketStat := range bucketStats {
totReplicaSize += bucketStat . ReplicationStats . ReplicaSize
for arn , stat := range bucketStat . ReplicationStats . Stats {
oldst := stats [ arn ]
if oldst == nil {
oldst = & BucketReplicationStat { }
}
stats [ arn ] = & BucketReplicationStat {
FailedCount : stat . FailedCount + oldst . FailedCount ,
FailedSize : stat . FailedSize + oldst . FailedSize ,
ReplicatedSize : stat . ReplicatedSize + oldst . ReplicatedSize ,
Latency : stat . Latency . merge ( oldst . Latency ) ,
}
}
}
// add initial usage stat to cluster stats
usageStat := globalReplicationStats . GetInitialUsage ( bucket )
totReplicaSize += usageStat . ReplicaSize
if usageStat . Stats != nil {
for arn , stat := range usageStat . Stats {
st := stats [ arn ]
if st == nil {
st = & BucketReplicationStat {
ReplicatedSize : stat . ReplicatedSize ,
FailedSize : stat . FailedSize ,
FailedCount : stat . FailedCount ,
}
} else {
st . ReplicatedSize += stat . ReplicatedSize
st . FailedSize += stat . FailedSize
st . FailedCount += stat . FailedCount
}
stats [ arn ] = st
}
}
s = BucketReplicationStats {
Stats : make ( map [ string ] * BucketReplicationStat , len ( stats ) ) ,
}
var latestTotReplicatedSize int64
for _ , st := range u . ReplicationInfo {
latestTotReplicatedSize += int64 ( st . ReplicatedSize )
}
// normalize computed real time stats with latest usage stat
for arn , tgtstat := range stats {
st := BucketReplicationStat { }
bu , ok := u . ReplicationInfo [ arn ]
if ! ok {
bu = BucketTargetUsageInfo { }
}
// use in memory replication stats if it is ahead of usage info.
st . ReplicatedSize = int64 ( bu . ReplicatedSize )
if tgtstat . ReplicatedSize >= int64 ( bu . ReplicatedSize ) {
st . ReplicatedSize = tgtstat . ReplicatedSize
}
s . ReplicatedSize += st . ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st . FailedSize = int64 ( math . Max ( float64 ( tgtstat . FailedSize ) , 0 ) )
st . FailedCount = int64 ( math . Max ( float64 ( tgtstat . FailedCount ) , 0 ) )
st . Latency = tgtstat . Latency
s . Stats [ arn ] = & st
s . FailedSize += st . FailedSize
s . FailedCount += st . FailedCount
}
// normalize overall stats
s . ReplicaSize = int64 ( math . Max ( float64 ( totReplicaSize ) , float64 ( u . ReplicaSize ) ) )
s . ReplicatedSize = int64 ( math . Max ( float64 ( s . ReplicatedSize ) , float64 ( latestTotReplicatedSize ) ) )
return s
}